From fc0bf1acc1f5d90f7c1b432df907f4b5f9f3e31a Mon Sep 17 00:00:00 2001 From: momo5502 Date: Fri, 27 Jan 2017 17:21:10 +0100 Subject: [PATCH] [IPC] Basic IPC channel class --- src/Components/Modules/IPCPipe.cpp | 67 +++++-------------- src/STDInclude.hpp | 6 ++ src/Utils/IPC.cpp | 104 +++++++++++++++++++++++++++++ src/Utils/IPC.hpp | 55 +++++++++++++++ src/Worker/Main.cpp | 12 +++- src/Worker/STDInclude.hpp | 6 ++ 6 files changed, 200 insertions(+), 50 deletions(-) create mode 100644 src/Utils/IPC.cpp create mode 100644 src/Utils/IPC.hpp diff --git a/src/Components/Modules/IPCPipe.cpp b/src/Components/Modules/IPCPipe.cpp index 3cdd88b9..640ca0f7 100644 --- a/src/Components/Modules/IPCPipe.cpp +++ b/src/Components/Modules/IPCPipe.cpp @@ -1,22 +1,5 @@ #include "STDInclude.hpp" -namespace boost -{ - typedef unsigned long long ulong_long_type; -} - -#pragma warning(push) -#pragma warning(disable: 4091) -#pragma warning(disable: 4996) - -#undef sleep -//#define BOOST_DISABLE_WIN32 -#define BOOST_USE_WINDOWS_H -#define BOOST_DATE_TIME_NO_LIB -#include - -#pragma warning(pop) - namespace Components { Pipe IPCPipe::ServerPipe; @@ -223,38 +206,6 @@ namespace Components IPCPipe::IPCPipe() { - Command::Add("mq1", [](Command::Params*) - { - boost::interprocess::message_queue::remove("message_queue"); - - //Create a message_queue. - boost::interprocess::message_queue mq(boost::interprocess::create_only, "message_queue", 100, sizeof(int)); - - int i = 1; - mq.send(&i, sizeof(i), 0); - }); - - Command::Add("mq2", [](Command::Params*) - { - boost::interprocess::message_queue mq(boost::interprocess::open_only, "message_queue"); - - unsigned int priority; - boost::interprocess::message_queue::size_type recvd_size; - - //Receive 100 numbers - for (int i = 0; i < 3; ++i) { - int number; - if (!mq.try_receive(&number, sizeof(number), recvd_size, priority) || recvd_size != sizeof(number)) - { - Logger::Print("Nothing received\n"); - } - else - { - Logger::Print("Rec: %d\n", number); - } - } - }); - if (Dedicated::IsEnabled()) return; // Server pipe @@ -285,6 +236,24 @@ namespace Components IPCPipe::Write("ping", ""); }); + static Utils::IPC::Channel channel("iw4xChannel"); + static Utils::IPC::Channel channel2("iw4xChannel2"); + channel.send("Hello world!"); + + Command::Add("ipcchan", [](Command::Params* params) + { + channel.send(params->join(1)); + }); + + QuickPatch::OnFrame([]() + { + std::string buffer; + if(channel2.receive(&buffer)) + { + Logger::Print("Data received: %s\n", buffer.data()); + } + }); + STARTUPINFOA sInfo; PROCESS_INFORMATION pInfo; diff --git a/src/STDInclude.hpp b/src/STDInclude.hpp index fb6447fe..2667fb2d 100644 --- a/src/STDInclude.hpp +++ b/src/STDInclude.hpp @@ -18,6 +18,11 @@ #include #include +#pragma warning(push) +#pragma warning(disable: 4996) +#include +#pragma warning(pop) + #include #include #include @@ -89,6 +94,7 @@ template class Sizer { }; #endif #include "Utils/IO.hpp" +#include "Utils/IPC.hpp" #include "Utils/CSV.hpp" #include "Utils/Time.hpp" #include "Utils/Cache.hpp" diff --git a/src/Utils/IPC.cpp b/src/Utils/IPC.cpp new file mode 100644 index 00000000..e763be18 --- /dev/null +++ b/src/Utils/IPC.cpp @@ -0,0 +1,104 @@ +#include "STDInclude.hpp" + +namespace Utils +{ + namespace IPC + { + Channel::Channel(std::string _name, int queueSize, int bufferSize) : name(_name) + { + //boost::interprocess::message_queue::remove(this->name.data()); + queue.reset(new boost::interprocess::message_queue(boost::interprocess::open_or_create, this->name.data(), queueSize, bufferSize + sizeof(Channel::Header))); + } + + Channel::~Channel() + { + boost::interprocess::message_queue::remove(this->name.data()); + } + + bool Channel::receive(std::string* data) + { + if (!data) return false; + data->clear(); + + if(this->packet.size() < this->queue->get_max_msg_size()) + { + packet.resize(this->queue->get_max_msg_size()); + } + + const Channel::Header* header = reinterpret_cast(packet.data()); + const char* buffer = reinterpret_cast(header + 1); + + unsigned int priority; + boost::interprocess::message_queue::size_type recvd_size; + + if (this->queue->try_receive(const_cast(packet.data()), packet.size(), recvd_size, priority)) + { + if ((recvd_size - sizeof(Channel::Header)) != header->fragmentSize || header->fragmentPart) return false; + data->append(buffer, header->fragmentSize); + + if(header->fragmented) + { + Channel::Header mainHeader = *header; + unsigned int part = mainHeader.fragmentPart; + + while (true) + { + this->queue->receive(const_cast(packet.data()), packet.size(), recvd_size, priority); + + if (header->packetId != mainHeader.packetId || header->totalSize != mainHeader.totalSize || header->fragmentPart != (++part)) + { + data->clear(); + return false; + } + + data->append(buffer, header->fragmentSize); + + if(header->totalSize <= data->size()) + { + break; + } + } + } + + return true; + } + + return false; + } + + void Channel::send(std::string data) + { + const size_t fragmentSize = (this->queue->get_max_msg_size() - sizeof(Channel::Header)) - 1; + + Channel::Header header; + header.fragmented = (data.size() + sizeof(Channel::Header)) > this->queue->get_max_msg_size(); + header.packetId = static_cast(timeGetTime() + rand());; + header.totalSize = data.size(); + + size_t sentSize = 0; + for (unsigned short i = 0; sentSize < data.size(); ++i) + { + header.fragmentPart = i; + header.fragmentSize = std::min(fragmentSize, data.size() - (fragmentSize * i)); + + std::string buffer; + buffer.append(reinterpret_cast(&header), sizeof(Channel::Header)); + buffer.append(data.data() + sentSize, header.fragmentSize); + Channel::sendMessage(buffer); + + sentSize += header.fragmentSize; + } + } + + void Channel::sendMessage(std::string data) + { + if (data.size() <= this->queue->get_max_msg_size()) + { + while (!this->queue->try_send(data.data(), data.size(), 0)) + { + std::this_thread::sleep_for(100us); + } + } + } + } +} diff --git a/src/Utils/IPC.hpp b/src/Utils/IPC.hpp new file mode 100644 index 00000000..72c72382 --- /dev/null +++ b/src/Utils/IPC.hpp @@ -0,0 +1,55 @@ +#pragma once + +namespace boost +{ + typedef unsigned long long ulong_long_type; +} + +#pragma warning(push) +#pragma warning(disable: 4091) +#pragma warning(disable: 4996) +#define _SCL_SECURE_NO_WARNINGS + +#ifdef sleep +#undef sleep +#endif + +//#define BOOST_DISABLE_WIN32 +#define BOOST_USE_WINDOWS_H +#define BOOST_DATE_TIME_NO_LIB +#include + +#undef _SCL_SECURE_NO_WARNINGS +#pragma warning(pop) + +namespace Utils +{ + namespace IPC + { + class Channel + { + public: + Channel(std::string _name, int queueSize = 100, int bufferSize = 20); + ~Channel(); + + bool receive(std::string* data); + void send(std::string data); + + private: + struct Header + { + bool fragmented; + unsigned short packetId; + size_t fragmentSize; + size_t totalSize; + unsigned int fragmentPart; + }; + + void sendMessage(std::string data); + + std::unique_ptr queue; + std::string packet; + std::string name; + }; + } +} diff --git a/src/Worker/Main.cpp b/src/Worker/Main.cpp index d32947d6..6050ff4f 100644 --- a/src/Worker/Main.cpp +++ b/src/Worker/Main.cpp @@ -4,9 +4,18 @@ void worker(bool* terminator) { printf("Worker started\n"); + Utils::IPC::Channel channel("iw4xChannel"); + Utils::IPC::Channel channel2("iw4xChannel2"); while(!*terminator) { + std::string buffer; + if(channel.receive(&buffer)) + { + printf("Data received: %s\n", buffer.data()); + channel2.send("OK " + buffer); + } + std::this_thread::sleep_for(1ms); } @@ -51,7 +60,8 @@ int main() if (runner.joinable()) runner.join(); printf("Worker terminated\n"); - _getch(); + //_getch(); + google::protobuf::ShutdownProtobufLibrary(); return 0; } diff --git a/src/Worker/STDInclude.hpp b/src/Worker/STDInclude.hpp index 12ede63f..2237256d 100644 --- a/src/Worker/STDInclude.hpp +++ b/src/Worker/STDInclude.hpp @@ -18,6 +18,11 @@ #include #include +#pragma warning(push) +#pragma warning(disable: 4996) +#include +#pragma warning(pop) + #include #include #include @@ -72,6 +77,7 @@ template class Sizer { }; #pragma warning(pop) #include "../Utils/IO.hpp" +#include "../Utils/IPC.hpp" #include "../Utils/Time.hpp" #include "../Utils/Chain.hpp" #include "../Utils/Utils.hpp"