From 559195c5cc521d818b3efc5661d12c18b60a2a08 Mon Sep 17 00:00:00 2001 From: momo5502 Date: Fri, 27 Jan 2017 22:45:01 +0100 Subject: [PATCH] [IPCHandler] Register basic ipc handler --- src/Components/Loader.cpp | 2 +- src/Components/Loader.hpp | 2 +- src/Components/Modules/ConnectProtocol.cpp | 10 +- src/Components/Modules/IPCHandler.cpp | 83 ++++++ src/Components/Modules/IPCHandler.hpp | 33 +++ src/Components/Modules/IPCPipe.cpp | 296 --------------------- src/Components/Modules/IPCPipe.hpp | 83 ------ src/Proto/ipc.proto | 9 + src/STDInclude.hpp | 1 + src/Worker/Runner.cpp | 4 +- 10 files changed, 135 insertions(+), 388 deletions(-) create mode 100644 src/Components/Modules/IPCHandler.cpp create mode 100644 src/Components/Modules/IPCHandler.hpp delete mode 100644 src/Components/Modules/IPCPipe.cpp delete mode 100644 src/Components/Modules/IPCPipe.hpp create mode 100644 src/Proto/ipc.proto diff --git a/src/Components/Loader.cpp b/src/Components/Loader.cpp index 7e731941..7afac788 100644 --- a/src/Components/Loader.cpp +++ b/src/Components/Loader.cpp @@ -43,7 +43,6 @@ namespace Components Loader::Register(new Command()); Loader::Register(new Console()); Loader::Register(new Friends()); - Loader::Register(new IPCPipe()); Loader::Register(new ModList()); Loader::Register(new Network()); Loader::Register(new Theatre()); @@ -68,6 +67,7 @@ namespace Components Loader::Register(new BitMessage()); #endif Loader::Register(new FileSystem()); + Loader::Register(new IPCHandler()); Loader::Register(new ModelSurfs()); Loader::Register(new PlayerName()); Loader::Register(new QuickPatch()); diff --git a/src/Components/Loader.hpp b/src/Components/Loader.hpp index 696897ae..f9da13d3 100644 --- a/src/Components/Loader.hpp +++ b/src/Components/Loader.hpp @@ -59,7 +59,6 @@ namespace Components #include "Modules/Window.hpp" #include "Modules/Command.hpp" #include "Modules/Console.hpp" -#include "Modules/IPCPipe.hpp" #include "Modules/UIScript.hpp" #include "Modules/ModList.hpp" #include "Modules/Network.hpp" @@ -86,6 +85,7 @@ namespace Components #include "Modules/Threading.hpp" #include "Modules/BitMessage.hpp" #include "Modules/FileSystem.hpp" +#include "Modules/IPCHandler.hpp" #include "Modules/ModelSurfs.hpp" #include "Modules/PlayerName.hpp" #include "Modules/QuickPatch.hpp" diff --git a/src/Components/Modules/ConnectProtocol.cpp b/src/Components/Modules/ConnectProtocol.cpp index defdd32f..44b417f4 100644 --- a/src/Components/Modules/ConnectProtocol.cpp +++ b/src/Components/Modules/ConnectProtocol.cpp @@ -212,10 +212,10 @@ namespace Components ConnectProtocol::ConnectProtocol() { // IPC handler - IPCPipe::On("connect", [] (std::string data) - { - Command::Execute(Utils::String::VA("connect %s", data.data()), false); - }); +// IPCPipe::On("connect", [] (std::string data) +// { +// Command::Execute(Utils::String::VA("connect %s", data.data()), false); +// }); // Invocation handler QuickPatch::Once(ConnectProtocol::Invocation); @@ -229,7 +229,7 @@ namespace Components { if (!Singleton::IsFirstInstance()) { - IPCPipe::Write("connect", ConnectProtocol::ConnectString); + //IPCPipe::Write("connect", ConnectProtocol::ConnectString); ExitProcess(0); } else diff --git a/src/Components/Modules/IPCHandler.cpp b/src/Components/Modules/IPCHandler.cpp new file mode 100644 index 00000000..d0c66eb4 --- /dev/null +++ b/src/Components/Modules/IPCHandler.cpp @@ -0,0 +1,83 @@ +#include "STDInclude.hpp" + +namespace Components +{ + std::unordered_map IPCHandler::WorkerCallbacks; + std::unordered_map IPCHandler::ClientCallbacks; + + std::unique_ptr IPCHandler::WorkerChannel; + std::unique_ptr IPCHandler::ClientChannel; + + void IPCHandler::SendWorker(std::string message, std::string data) + { + IPCHandler::InitChannels(); + + } + + void IPCHandler::SendClient(std::string message, std::string data) + { + IPCHandler::InitChannels(); + //IPCHandler::ClientChannel->send() + } + + void IPCHandler::OnWorker(std::string message, IPCHandler::Callback callback) + { + IPCHandler::WorkerCallbacks[message] = callback; + } + + void IPCHandler::OnClient(std::string message, IPCHandler::Callback callback) + { + IPCHandler::ClientCallbacks[message] = callback; + } + + void IPCHandler::InitChannels() + { + if (!IPCHandler::WorkerChannel) + { + IPCHandler::WorkerChannel.reset(new Utils::IPC::BidirectionalChannel("IW4x-Worker-Channel", !Worker::IsWorker())); + } + + if (!IPCHandler::ClientChannel) + { + IPCHandler::ClientChannel.reset(new Utils::IPC::BidirectionalChannel("IW4x-Client-Channel", Singleton::IsFirstInstance())); + } + } + + void IPCHandler::StartWorker() + { + STARTUPINFOA sInfo; + PROCESS_INFORMATION pInfo; + + ZeroMemory(&sInfo, sizeof(sInfo)); + ZeroMemory(&pInfo, sizeof(pInfo)); + sInfo.cb = sizeof(sInfo); + + CreateProcessA("iw4x.exe", const_cast(Utils::String::VA("-parent %d", GetCurrentProcessId())), nullptr, nullptr, false, NULL, nullptr, nullptr, &sInfo, &pInfo); + + if (pInfo.hThread && pInfo.hThread != INVALID_HANDLE_VALUE) CloseHandle(pInfo.hThread); + if (pInfo.hProcess && pInfo.hProcess != INVALID_HANDLE_VALUE) CloseHandle(pInfo.hProcess); + } + + IPCHandler::IPCHandler() + { + if (Dedicated::IsEnabled()) return; + + IPCHandler::InitChannels(); + IPCHandler::StartWorker(); + + QuickPatch::OnFrame([]() + { + std::string buffer; + if(IPCHandler::WorkerChannel->receive(&buffer)) + { + Logger::Print("Data received: %s\n", buffer.data()); + } + }); + } + + IPCHandler::~IPCHandler() + { + IPCHandler::WorkerCallbacks.clear(); + IPCHandler::ClientCallbacks.clear(); + } +} diff --git a/src/Components/Modules/IPCHandler.hpp b/src/Components/Modules/IPCHandler.hpp new file mode 100644 index 00000000..d9d2f72e --- /dev/null +++ b/src/Components/Modules/IPCHandler.hpp @@ -0,0 +1,33 @@ +#pragma once + +namespace Components +{ + class IPCHandler : public Component + { + public: + typedef Utils::Slot Callback; + + IPCHandler(); + ~IPCHandler(); + +#if defined(DEBUG) || defined(FORCE_UNIT_TESTS) + const char* getName() override { return "IPCHandler"; }; +#endif + + void SendWorker(std::string message, std::string data); + void SendClient(std::string message, std::string data); + + void OnWorker(std::string message, Callback callback); + void OnClient(std::string message, Callback callback); + + private: + static std::unique_ptr WorkerChannel; + static std::unique_ptr ClientChannel; + + static std::unordered_map WorkerCallbacks; + static std::unordered_map ClientCallbacks; + + static void InitChannels(); + static void StartWorker(); + }; +} diff --git a/src/Components/Modules/IPCPipe.cpp b/src/Components/Modules/IPCPipe.cpp deleted file mode 100644 index 1b558082..00000000 --- a/src/Components/Modules/IPCPipe.cpp +++ /dev/null @@ -1,296 +0,0 @@ -#include "STDInclude.hpp" - -namespace Components -{ - Pipe IPCPipe::ServerPipe; - Pipe IPCPipe::ClientPipe; - -#pragma region Pipe - - Pipe::Pipe() : connectCallback(0), pipe(INVALID_HANDLE_VALUE), threadAttached(false), type(IPCTYPE_NONE), reconnectAttempt(0) - { - this->destroy(); - } - - Pipe::~Pipe() - { - this->destroy(); - } - - bool Pipe::connect(std::string name) - { - this->destroy(); - - this->type = IPCTYPE_CLIENT; - this->setName(name); - - this->pipe = CreateFileA(this->pipeFile, GENERIC_READ | GENERIC_WRITE, 0, nullptr, OPEN_EXISTING, 0, nullptr); - - if (INVALID_HANDLE_VALUE == this->pipe) - { - Logger::Print("Failed to connect to the pipe\n"); - - if (this->reconnectAttempt < IPC_MAX_RECONNECTS) - { - Logger::Print("Attempting to reconnect to the pipe.\n"); - ++this->reconnectAttempt; - std::this_thread::sleep_for(500ms); - - return this->connect(name); - } - else - { - this->destroy(); - return false; - } - } - - this->reconnectAttempt = 0; - Logger::Print("Successfully connected to the pipe\n"); - - return true; - } - - bool Pipe::create(std::string name) - { - this->destroy(); - - this->type = IPCTYPE_SERVER; - this->setName(name); - - this->pipe = CreateNamedPipeA(this->pipeFile, PIPE_ACCESS_DUPLEX, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, sizeof(this->packet), sizeof(this->packet), NMPWAIT_USE_DEFAULT_WAIT, nullptr); - - if (INVALID_HANDLE_VALUE != this->pipe && this->pipe) - { - // Only create the thread, when not performing unit tests! - if (!Loader::PerformingUnitTests()) - { - this->threadAttached = true; - this->thread = std::thread(Pipe::ReceiveThread, this); - } - - Logger::Print("Pipe successfully created\n"); - return true; - } - - Logger::Print("Failed to create the pipe\n"); - this->destroy(); - return false; - } - - void Pipe::onConnect(Pipe::Callback callback) - { - this->connectCallback = callback; - } - - void Pipe::setCallback(std::string command, Utils::Slot callback) - { - this->packetCallbacks[command] = callback; - } - - bool Pipe::write(std::string command, std::string data) - { - if (this->type != IPCTYPE_CLIENT || this->pipe == INVALID_HANDLE_VALUE) return false; - - Pipe::Packet _packet; - strcpy_s(_packet.command, command.data()); - strcpy_s(_packet.buffer, data.data()); - - DWORD cbBytes; - return (WriteFile(this->pipe, &_packet, sizeof(_packet), &cbBytes, nullptr) || GetLastError() == ERROR_IO_PENDING); - } - - void Pipe::destroy() - { - //this->Type = IPCTYPE_NONE; - - //*this->PipeFile = 0; - //*this->PipeName = 0; - - if (this->pipe && INVALID_HANDLE_VALUE != this->pipe) - { - CancelIoEx(this->pipe, nullptr); - //DeleteFileA(this->pipeFile); - - if (this->type == IPCTYPE_SERVER) DisconnectNamedPipe(this->pipe); - - CloseHandle(this->pipe); - Logger::Print("Disconnected from the pipe.\n"); - } - - this->pipe = nullptr; - this->threadAttached = false; - - if (this->thread.joinable()) - { - Logger::Print("Terminating pipe thread...\n"); - - this->thread.join(); - - Logger::Print("Pipe thread terminated.\n"); - } - - this->thread = std::thread(); - } - - void Pipe::setName(std::string name) - { - memset(this->pipeName, 0, sizeof(this->pipeName)); - memset(this->pipeFile, 0, sizeof(this->pipeFile)); - - strncpy_s(this->pipeName, name.data(), sizeof(this->pipeName)); - sprintf_s(this->pipeFile, sizeof(this->pipeFile), "\\\\.\\Pipe\\%s", this->pipeName); - } - - void Pipe::ReceiveThread(Pipe* pipe) - { - if (!pipe || pipe->type != IPCTYPE_SERVER || pipe->pipe == INVALID_HANDLE_VALUE || !pipe->pipe) return; - - if (ConnectNamedPipe(pipe->pipe, nullptr) == FALSE) - { - Logger::Print("Failed to initialize pipe reading.\n"); - return; - } - - Logger::Print("Client connected to the pipe\n"); - pipe->connectCallback(); - - DWORD cbBytes; - - while (pipe->threadAttached && pipe->pipe && pipe->pipe != INVALID_HANDLE_VALUE) - { - BOOL bResult = ReadFile(pipe->pipe, &pipe->packet, sizeof(pipe->packet), &cbBytes, nullptr); - - if (bResult && cbBytes) - { - if (pipe->packetCallbacks.find(pipe->packet.command) != pipe->packetCallbacks.end()) - { - pipe->packetCallbacks[pipe->packet.command](pipe->packet.buffer); - } - } - else if (pipe->threadAttached && pipe->pipe != INVALID_HANDLE_VALUE) - { - Logger::Print("Failed to read from client through pipe\n"); - - DisconnectNamedPipe(pipe->pipe); - ConnectNamedPipe(pipe->pipe, nullptr); - pipe->connectCallback(); - } - - ZeroMemory(&pipe->packet, sizeof(pipe->packet)); - } - } - -#pragma endregion - - // Callback to connect first instance's client pipe to the second instance's server pipe - void IPCPipe::ConnectClient() - { - if (Singleton::IsFirstInstance()) - { - IPCPipe::ClientPipe.connect(IPC_PIPE_NAME_CLIENT); - } - } - - // Writes to the process on the other end of the pipe - bool IPCPipe::Write(std::string command, std::string data) - { - return IPCPipe::ClientPipe.write(command, data); - } - - // Installs a callback for receiving commands from the process on the other end of the pipe - void IPCPipe::On(std::string command, Utils::Slot callback) - { - IPCPipe::ServerPipe.setCallback(command, callback); - } - - IPCPipe::IPCPipe() - { - if (Dedicated::IsEnabled()) return; - - // Server pipe - IPCPipe::ServerPipe.onConnect(IPCPipe::ConnectClient); - IPCPipe::ServerPipe.create((Singleton::IsFirstInstance() ? IPC_PIPE_NAME_SERVER : IPC_PIPE_NAME_CLIENT)); - - // Connect second instance's client pipe to first instance's server pipe - if (!Singleton::IsFirstInstance()) - { - IPCPipe::ClientPipe.connect(IPC_PIPE_NAME_SERVER); - } - - IPCPipe::On("ping", [] (std::string data) - { - Logger::Print("Received ping form pipe, sending pong!\n"); - IPCPipe::Write("pong", data); - }); - - IPCPipe::On("pong", [] (std::string data) - { - Logger::Print("Received pong form pipe!\n"); - }); - - // Test pipe functionality by sending pings - Command::Add("ipcping", [] (Command::Params*) - { - Logger::Print("Sending ping to pipe!\n"); - IPCPipe::Write("ping", ""); - }); - - static Utils::IPC::BidirectionalChannel channel("iw4xChannel", !Worker::IsWorker()); - channel.send("Hello world!"); - - Command::Add("ipcchan", [](Command::Params* params) - { - channel.send(params->join(1)); - }); - - Command::Add("ipcchantest", [](Command::Params*) - { - std::string buffer; - buffer.reserve(2048); - - for(int i = 0; i < 1020; ++i) - { - buffer.append("a", 1); - } - - buffer.append("lolsnakerules!"); - - for (int i = 0; i < 1020; ++i) - { - buffer.append("a", 1); - } - - buffer.append("lolsnakerules!"); - - channel.send(buffer); - }); - - QuickPatch::OnFrame([]() - { - std::string buffer; - if(channel.receive(&buffer)) - { - Logger::Print("Data received: %s\n", buffer.data()); - } - }); - - STARTUPINFOA sInfo; - PROCESS_INFORMATION pInfo; - - ZeroMemory(&sInfo, sizeof(sInfo)); - ZeroMemory(&pInfo, sizeof(pInfo)); - sInfo.cb = sizeof(sInfo); - - CreateProcessA("iw4x.exe", const_cast(Utils::String::VA("-parent %d", GetCurrentProcessId())), nullptr, nullptr, false, NULL, nullptr, nullptr, &sInfo, &pInfo); - - if (pInfo.hThread && pInfo.hThread != INVALID_HANDLE_VALUE) CloseHandle(pInfo.hThread); - if (pInfo.hProcess && pInfo.hProcess != INVALID_HANDLE_VALUE) CloseHandle(pInfo.hProcess); - } - - void IPCPipe::preDestroy() - { - IPCPipe::ServerPipe.destroy(); - IPCPipe::ClientPipe.destroy(); - } -} diff --git a/src/Components/Modules/IPCPipe.hpp b/src/Components/Modules/IPCPipe.hpp deleted file mode 100644 index 58bbc713..00000000 --- a/src/Components/Modules/IPCPipe.hpp +++ /dev/null @@ -1,83 +0,0 @@ -#pragma once - -#define IPC_MAX_RECONNECTS 3 -#define IPC_COMMAND_SIZE 100 -#define IPC_BUFFER_SIZE 0x2000 - -#define IPC_PIPE_NAME_SERVER "IW4x-Server" -#define IPC_PIPE_NAME_CLIENT "IW4x-Client" - -namespace Components -{ - class Pipe - { - public: - struct Packet - { - char command[IPC_COMMAND_SIZE]; - char buffer[IPC_BUFFER_SIZE]; - }; - - enum Type - { - IPCTYPE_NONE, - IPCTYPE_SERVER, - IPCTYPE_CLIENT - }; - - typedef void(__cdecl PacketCallback)(std::string data); - typedef void(__cdecl Callback)(); - - Pipe(); - ~Pipe(); - - bool connect(std::string name); - bool create(std::string name); - - bool write(std::string command, std::string data); - void setCallback(std::string command, Utils::Slot callback); - void onConnect(Callback callback); - - void destroy(); - - private: - Utils::Slot connectCallback; - std::map> packetCallbacks; - - HANDLE pipe; - std::thread thread; - bool threadAttached; - - Type type; - Packet packet; - - char pipeName[MAX_PATH]; - char pipeFile[MAX_PATH]; - unsigned int reconnectAttempt; - - void setName(std::string name); - - static void ReceiveThread(Pipe* pipe); - }; - - class IPCPipe : public Component - { - public: - IPCPipe(); - -#if defined(DEBUG) || defined(FORCE_UNIT_TESTS) - const char* getName() override { return "IPCPipe"; }; -#endif - - void preDestroy() override; - - static bool Write(std::string command, std::string data); - static void On(std::string command, Utils::Slot callback); - - private: - static Pipe ServerPipe; - static Pipe ClientPipe; - - static void ConnectClient(); - }; -} diff --git a/src/Proto/ipc.proto b/src/Proto/ipc.proto new file mode 100644 index 00000000..f2e71514 --- /dev/null +++ b/src/Proto/ipc.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +package Proto.IPC; + +message Command +{ + bytes command = 1; + bytes data = 2; +} diff --git a/src/STDInclude.hpp b/src/STDInclude.hpp index d6cb65ab..971715fd 100644 --- a/src/STDInclude.hpp +++ b/src/STDInclude.hpp @@ -85,6 +85,7 @@ template class Sizer { }; #include "proto/auth.pb.h" #include "proto/node.pb.h" #include "proto/rcon.pb.h" +#include "proto/ipc.pb.h" #pragma warning(pop) diff --git a/src/Worker/Runner.cpp b/src/Worker/Runner.cpp index 19e56d93..be4b63a1 100644 --- a/src/Worker/Runner.cpp +++ b/src/Worker/Runner.cpp @@ -29,8 +29,8 @@ namespace Worker WaitForSingleObject(processHandle, INFINITE); CloseHandle(processHandle); - this->terminate = true; printf("Awaiting worker termination...\n"); + this->terminate = true; if (workerThread.joinable()) workerThread.join(); printf("Worker terminated\n"); } @@ -39,7 +39,7 @@ namespace Worker void Runner::worker() { printf("Worker started\n"); - Utils::IPC::BidirectionalChannel channel("iw4xChannel", !Worker::IsWorker()); + Utils::IPC::BidirectionalChannel channel("IW4x-Worker-Channel", !Worker::IsWorker()); while (!this->terminate) {