diff --git a/src/Components/Loader.cpp b/src/Components/Loader.cpp index 8bb53622..4b3aea86 100644 --- a/src/Components/Loader.cpp +++ b/src/Components/Loader.cpp @@ -44,6 +44,7 @@ namespace Components Loader::Register(new Command()); Loader::Register(new Console()); Loader::Register(new Friends()); + Loader::Register(new IPCPipe()); Loader::Register(new ModList()); Loader::Register(new Network()); Loader::Register(new Theatre()); @@ -68,7 +69,6 @@ namespace Components Loader::Register(new BitMessage()); #endif Loader::Register(new FileSystem()); - Loader::Register(new IPCHandler()); Loader::Register(new ModelSurfs()); Loader::Register(new PlayerName()); Loader::Register(new QuickPatch()); diff --git a/src/Components/Loader.hpp b/src/Components/Loader.hpp index 8c1ae8d2..48297593 100644 --- a/src/Components/Loader.hpp +++ b/src/Components/Loader.hpp @@ -68,6 +68,7 @@ namespace Components #include "Modules/Party.hpp" // Destroys the order, but requires network classes :D #include "Modules/Logger.hpp" #include "Modules/Friends.hpp" +#include "Modules/IPCPipe.hpp" #include "Modules/Download.hpp" #include "Modules/Playlist.hpp" #include "Modules/RawFiles.hpp" @@ -86,7 +87,6 @@ namespace Components #include "Modules/Threading.hpp" #include "Modules/BitMessage.hpp" #include "Modules/FileSystem.hpp" -#include "Modules/IPCHandler.hpp" #include "Modules/ModelSurfs.hpp" #include "Modules/PlayerName.hpp" #include "Modules/QuickPatch.hpp" diff --git a/src/Components/Modules/ConnectProtocol.cpp b/src/Components/Modules/ConnectProtocol.cpp index a70201ee..defdd32f 100644 --- a/src/Components/Modules/ConnectProtocol.cpp +++ b/src/Components/Modules/ConnectProtocol.cpp @@ -212,7 +212,7 @@ namespace Components ConnectProtocol::ConnectProtocol() { // IPC handler - IPCHandler::OnClient("connect", [] (std::string data) + IPCPipe::On("connect", [] (std::string data) { Command::Execute(Utils::String::VA("connect %s", data.data()), false); }); @@ -229,7 +229,7 @@ namespace Components { if (!Singleton::IsFirstInstance()) { - IPCHandler::SendClient("connect", ConnectProtocol::ConnectString); + IPCPipe::Write("connect", ConnectProtocol::ConnectString); ExitProcess(0); } else diff --git a/src/Components/Modules/IPCHandler.cpp b/src/Components/Modules/IPCHandler.cpp deleted file mode 100644 index 1f05893e..00000000 --- a/src/Components/Modules/IPCHandler.cpp +++ /dev/null @@ -1,65 +0,0 @@ -#include "STDInclude.hpp" - -namespace Components -{ - std::unordered_map IPCHandler::ClientCallbacks; - std::unique_ptr IPCHandler::ClientChannel; - - void IPCHandler::SendClient(std::string message, std::string data) - { - IPCHandler::InitChannel(); - - Proto::IPC::Command command; - command.set_name(message); - command.set_data(data); - - IPCHandler::ClientChannel->send(command.SerializeAsString()); - } - - void IPCHandler::OnClient(std::string message, IPCHandler::Callback callback) - { - IPCHandler::ClientCallbacks[message] = callback; - } - - void IPCHandler::InitChannel() - { - if (!IPCHandler::ClientChannel) - { - IPCHandler::ClientChannel.reset(new Utils::IPC::BidirectionalChannel("IW4x-Client-Channel", Singleton::IsFirstInstance())); - } - } - - void IPCHandler::HandleClient() - { - IPCHandler::InitChannel(); - - std::string packet; - if(IPCHandler::ClientChannel->receive(&packet)) - { - Proto::IPC::Command command; - if(command.ParseFromString(packet)) - { - auto callback = IPCHandler::ClientCallbacks.find(command.name()); - if (callback != IPCHandler::ClientCallbacks.end()) - { - callback->second(command.data()); - } - } - } - } - - IPCHandler::IPCHandler() - { - if (Dedicated::IsEnabled() || ZoneBuilder::IsEnabled() || Loader::PerformingUnitTests()) return; - - IPCHandler::InitChannel(); - - QuickPatch::OnFrame(IPCHandler::HandleClient); - } - - IPCHandler::~IPCHandler() - { - IPCHandler::ClientCallbacks.clear(); - IPCHandler::ClientChannel.release(); - } -} diff --git a/src/Components/Modules/IPCHandler.hpp b/src/Components/Modules/IPCHandler.hpp deleted file mode 100644 index fe28bc47..00000000 --- a/src/Components/Modules/IPCHandler.hpp +++ /dev/null @@ -1,27 +0,0 @@ -#pragma once - -namespace Components -{ - class IPCHandler : public Component - { - public: - typedef Utils::Slot Callback; - - IPCHandler(); - ~IPCHandler(); - -#if defined(DEBUG) || defined(FORCE_UNIT_TESTS) - const char* getName() override { return "IPCHandler"; }; -#endif - - static void SendClient(std::string message, std::string data); - static void OnClient(std::string message, Callback callback); - - private: - static std::unique_ptr ClientChannel; - static std::unordered_map ClientCallbacks; - - static void InitChannel(); - static void HandleClient(); - }; -} diff --git a/src/Components/Modules/IPCPipe.cpp b/src/Components/Modules/IPCPipe.cpp new file mode 100644 index 00000000..bc664de4 --- /dev/null +++ b/src/Components/Modules/IPCPipe.cpp @@ -0,0 +1,245 @@ +#include "STDInclude.hpp" + +namespace Components +{ + Pipe IPCPipe::ServerPipe; + Pipe IPCPipe::ClientPipe; + +#pragma region Pipe + + Pipe::Pipe() : connectCallback(0), pipe(INVALID_HANDLE_VALUE), threadAttached(false), type(IPCTYPE_NONE), reconnectAttempt(0) + { + this->destroy(); + } + + Pipe::~Pipe() + { + this->destroy(); + } + + bool Pipe::connect(std::string name) + { + this->destroy(); + + this->type = IPCTYPE_CLIENT; + this->setName(name); + + this->pipe = CreateFileA(this->pipeFile, GENERIC_READ | GENERIC_WRITE, 0, nullptr, OPEN_EXISTING, 0, nullptr); + + if (INVALID_HANDLE_VALUE == this->pipe) + { + Logger::Print("Failed to connect to the pipe\n"); + + if (this->reconnectAttempt < IPC_MAX_RECONNECTS) + { + Logger::Print("Attempting to reconnect to the pipe.\n"); + ++this->reconnectAttempt; + std::this_thread::sleep_for(500ms); + + return this->connect(name); + } + else + { + this->destroy(); + return false; + } + } + + this->reconnectAttempt = 0; + Logger::Print("Successfully connected to the pipe\n"); + + return true; + } + + bool Pipe::create(std::string name) + { + this->destroy(); + + this->type = IPCTYPE_SERVER; + this->setName(name); + + this->pipe = CreateNamedPipeA(this->pipeFile, PIPE_ACCESS_DUPLEX, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, sizeof(this->packet), sizeof(this->packet), NMPWAIT_USE_DEFAULT_WAIT, nullptr); + + if (INVALID_HANDLE_VALUE != this->pipe && this->pipe) + { + // Only create the thread, when not performing unit tests! + if (!Loader::PerformingUnitTests()) + { + this->threadAttached = true; + this->thread = std::thread(Pipe::ReceiveThread, this); + } + + Logger::Print("Pipe successfully created\n"); + return true; + } + + Logger::Print("Failed to create the pipe\n"); + this->destroy(); + return false; + } + + void Pipe::onConnect(Pipe::Callback callback) + { + this->connectCallback = callback; + } + + void Pipe::setCallback(std::string command, Utils::Slot callback) + { + this->packetCallbacks[command] = callback; + } + + bool Pipe::write(std::string command, std::string data) + { + if (this->type != IPCTYPE_CLIENT || this->pipe == INVALID_HANDLE_VALUE) return false; + + Pipe::Packet _packet; + strcpy_s(_packet.command, command.data()); + strcpy_s(_packet.buffer, data.data()); + + DWORD cbBytes; + return (WriteFile(this->pipe, &_packet, sizeof(_packet), &cbBytes, nullptr) || GetLastError() == ERROR_IO_PENDING); + } + + void Pipe::destroy() + { + //this->Type = IPCTYPE_NONE; + + //*this->PipeFile = 0; + //*this->PipeName = 0; + + if (this->pipe && INVALID_HANDLE_VALUE != this->pipe) + { + CancelIoEx(this->pipe, nullptr); + //DeleteFileA(this->pipeFile); + + if (this->type == IPCTYPE_SERVER) DisconnectNamedPipe(this->pipe); + + CloseHandle(this->pipe); + Logger::Print("Disconnected from the pipe.\n"); + } + + this->pipe = nullptr; + this->threadAttached = false; + + if (this->thread.joinable()) + { + Logger::Print("Terminating pipe thread...\n"); + + this->thread.join(); + + Logger::Print("Pipe thread terminated.\n"); + } + + this->thread = std::thread(); + } + + void Pipe::setName(std::string name) + { + memset(this->pipeName, 0, sizeof(this->pipeName)); + memset(this->pipeFile, 0, sizeof(this->pipeFile)); + + strncpy_s(this->pipeName, name.data(), sizeof(this->pipeName)); + sprintf_s(this->pipeFile, sizeof(this->pipeFile), "\\\\.\\Pipe\\%s", this->pipeName); + } + + void Pipe::ReceiveThread(Pipe* pipe) + { + if (!pipe || pipe->type != IPCTYPE_SERVER || pipe->pipe == INVALID_HANDLE_VALUE || !pipe->pipe) return; + + if (ConnectNamedPipe(pipe->pipe, nullptr) == FALSE) + { + Logger::Print("Failed to initialize pipe reading.\n"); + return; + } + + Logger::Print("Client connected to the pipe\n"); + pipe->connectCallback(); + + DWORD cbBytes; + + while (pipe->threadAttached && pipe->pipe && pipe->pipe != INVALID_HANDLE_VALUE) + { + BOOL bResult = ReadFile(pipe->pipe, &pipe->packet, sizeof(pipe->packet), &cbBytes, nullptr); + + if (bResult && cbBytes) + { + if (pipe->packetCallbacks.find(pipe->packet.command) != pipe->packetCallbacks.end()) + { + pipe->packetCallbacks[pipe->packet.command](pipe->packet.buffer); + } + } + else if (pipe->threadAttached && pipe->pipe != INVALID_HANDLE_VALUE) + { + Logger::Print("Failed to read from client through pipe\n"); + + DisconnectNamedPipe(pipe->pipe); + ConnectNamedPipe(pipe->pipe, nullptr); + pipe->connectCallback(); + } + + ZeroMemory(&pipe->packet, sizeof(pipe->packet)); + } + } + +#pragma endregion + + // Callback to connect first instance's client pipe to the second instance's server pipe + void IPCPipe::ConnectClient() + { + if (Singleton::IsFirstInstance()) + { + IPCPipe::ClientPipe.connect(IPC_PIPE_NAME_CLIENT); + } + } + + // Writes to the process on the other end of the pipe + bool IPCPipe::Write(std::string command, std::string data) + { + return IPCPipe::ClientPipe.write(command, data); + } + + // Installs a callback for receiving commands from the process on the other end of the pipe + void IPCPipe::On(std::string command, Utils::Slot callback) + { + IPCPipe::ServerPipe.setCallback(command, callback); + } + + IPCPipe::IPCPipe() + { + if (Dedicated::IsEnabled()) return; + + // Server pipe + IPCPipe::ServerPipe.onConnect(IPCPipe::ConnectClient); + IPCPipe::ServerPipe.create((Singleton::IsFirstInstance() ? IPC_PIPE_NAME_SERVER : IPC_PIPE_NAME_CLIENT)); + + // Connect second instance's client pipe to first instance's server pipe + if (!Singleton::IsFirstInstance()) + { + IPCPipe::ClientPipe.connect(IPC_PIPE_NAME_SERVER); + } + + IPCPipe::On("ping", [] (std::string data) + { + Logger::Print("Received ping form pipe, sending pong!\n"); + IPCPipe::Write("pong", data); + }); + + IPCPipe::On("pong", [] (std::string data) + { + Logger::Print("Received pong form pipe!\n"); + }); + + // Test pipe functionality by sending pings + Command::Add("ipcping", [] (Command::Params*) + { + Logger::Print("Sending ping to pipe!\n"); + IPCPipe::Write("ping", ""); + }); + } + + void IPCPipe::preDestroy() + { + IPCPipe::ServerPipe.destroy(); + IPCPipe::ClientPipe.destroy(); + } +} diff --git a/src/Components/Modules/IPCPipe.hpp b/src/Components/Modules/IPCPipe.hpp new file mode 100644 index 00000000..58bbc713 --- /dev/null +++ b/src/Components/Modules/IPCPipe.hpp @@ -0,0 +1,83 @@ +#pragma once + +#define IPC_MAX_RECONNECTS 3 +#define IPC_COMMAND_SIZE 100 +#define IPC_BUFFER_SIZE 0x2000 + +#define IPC_PIPE_NAME_SERVER "IW4x-Server" +#define IPC_PIPE_NAME_CLIENT "IW4x-Client" + +namespace Components +{ + class Pipe + { + public: + struct Packet + { + char command[IPC_COMMAND_SIZE]; + char buffer[IPC_BUFFER_SIZE]; + }; + + enum Type + { + IPCTYPE_NONE, + IPCTYPE_SERVER, + IPCTYPE_CLIENT + }; + + typedef void(__cdecl PacketCallback)(std::string data); + typedef void(__cdecl Callback)(); + + Pipe(); + ~Pipe(); + + bool connect(std::string name); + bool create(std::string name); + + bool write(std::string command, std::string data); + void setCallback(std::string command, Utils::Slot callback); + void onConnect(Callback callback); + + void destroy(); + + private: + Utils::Slot connectCallback; + std::map> packetCallbacks; + + HANDLE pipe; + std::thread thread; + bool threadAttached; + + Type type; + Packet packet; + + char pipeName[MAX_PATH]; + char pipeFile[MAX_PATH]; + unsigned int reconnectAttempt; + + void setName(std::string name); + + static void ReceiveThread(Pipe* pipe); + }; + + class IPCPipe : public Component + { + public: + IPCPipe(); + +#if defined(DEBUG) || defined(FORCE_UNIT_TESTS) + const char* getName() override { return "IPCPipe"; }; +#endif + + void preDestroy() override; + + static bool Write(std::string command, std::string data); + static void On(std::string command, Utils::Slot callback); + + private: + static Pipe ServerPipe; + static Pipe ClientPipe; + + static void ConnectClient(); + }; +} diff --git a/src/STDInclude.hpp b/src/STDInclude.hpp index 31181d31..9b9bb491 100644 --- a/src/STDInclude.hpp +++ b/src/STDInclude.hpp @@ -19,11 +19,6 @@ #include #include -#pragma warning(push) -#pragma warning(disable: 4996) -#include -#pragma warning(pop) - #include #include #include @@ -98,7 +93,6 @@ template class Sizer { }; #endif #include "Utils/IO.hpp" -#include "Utils/IPC.hpp" #include "Utils/CSV.hpp" #include "Utils/Time.hpp" #include "Utils/Cache.hpp" diff --git a/src/Utils/IPC.cpp b/src/Utils/IPC.cpp deleted file mode 100644 index 3822db53..00000000 --- a/src/Utils/IPC.cpp +++ /dev/null @@ -1,144 +0,0 @@ -#include "STDInclude.hpp" - -namespace Utils -{ - namespace IPC - { - Channel::Channel(std::string _name, int _queueSize, int _bufferSize, bool _remove) : terminateQueue(false), remove(_remove), name(_name) - { - if(this->remove) boost::interprocess::message_queue::remove(this->name.data()); - queue.reset(new boost::interprocess::message_queue(boost::interprocess::open_or_create, this->name.data(), _queueSize, _bufferSize + sizeof(Channel::Header))); - - this->queueThread = std::thread(&Channel::queueWorker, this); - } - - Channel::~Channel() - { - std::unique_lock lock(this->queueMutex); - this->terminateQueue = true; - this->queueEvent.notify_all(); - lock.unlock(); - - if (this->queueThread.joinable()) - { - this->queueThread.join(); - } - - if (this->remove) boost::interprocess::message_queue::remove(this->name.data()); - } - - bool Channel::receive(std::string* data) - { - if (!data) return false; - data->clear(); - - if(this->packet.size() < this->queue->get_max_msg_size()) - { - packet.resize(this->queue->get_max_msg_size()); - } - - const Channel::Header* header = reinterpret_cast(packet.data()); - const char* buffer = reinterpret_cast(header + 1); - - unsigned int priority; - boost::interprocess::message_queue::size_type recvd_size; - - if (this->queue->try_receive(const_cast(packet.data()), packet.size(), recvd_size, priority)) - { - if ((recvd_size - sizeof(Channel::Header)) != header->fragmentSize || header->fragmentPart) return false; - data->append(buffer, header->fragmentSize); - - if(header->fragmented) - { - Channel::Header mainHeader = *header; - unsigned int part = mainHeader.fragmentPart; - - while (true) - { - if (!this->queue->try_receive(const_cast(packet.data()), packet.size(), recvd_size, priority)) return false; - - if (header->packetId != mainHeader.packetId || header->totalSize != mainHeader.totalSize || header->fragmentPart != (++part)) - { - data->clear(); - return false; - } - - data->append(buffer, header->fragmentSize); - - if(header->totalSize <= data->size()) - { - break; - } - } - } - - return true; - } - - return false; - } - - void Channel::send(std::string data) - { - const size_t fragmentSize = (this->queue->get_max_msg_size() - sizeof(Channel::Header)) - 1; - - Channel::Header header; - header.fragmented = (data.size() + sizeof(Channel::Header)) > this->queue->get_max_msg_size(); - header.packetId = static_cast(timeGetTime() + rand());; - header.totalSize = data.size(); - - size_t sentSize = 0; - for (unsigned short i = 0; sentSize < data.size(); ++i) - { - header.fragmentPart = i; - header.fragmentSize = std::min(fragmentSize, data.size() - (fragmentSize * i)); - - std::string buffer; - buffer.append(reinterpret_cast(&header), sizeof(Channel::Header)); - buffer.append(data.data() + sentSize, header.fragmentSize); - Channel::enqueueMessage(buffer); - - sentSize += header.fragmentSize; - } - } - - void Channel::enqueueMessage(std::string data) - { - if (data.size() <= this->queue->get_max_msg_size()) - { - std::lock_guard _(this->queueMutex); - this->internalQueue.push(data); - this->queueEvent.notify_all(); - } - } - - void Channel::queueWorker() - { - while (!this->terminateQueue) - { - std::unique_lock lock(this->queueMutex); - - while(!this->terminateQueue && this->internalQueue.empty()) - { - this->queueEvent.wait(lock); - } - - while(!this->terminateQueue && !this->internalQueue.empty()) - { - std::string data = this->internalQueue.front(); - this->internalQueue.pop(); - - if (data.size() <= this->queue->get_max_msg_size()) - { - while (!this->terminateQueue && !this->queue->try_send(data.data(), data.size(), 0)) - { - lock.unlock(); - std::this_thread::sleep_for(1000us); - lock.lock(); - } - } - } - } - } - } -} diff --git a/src/Utils/IPC.hpp b/src/Utils/IPC.hpp deleted file mode 100644 index b9b63a99..00000000 --- a/src/Utils/IPC.hpp +++ /dev/null @@ -1,106 +0,0 @@ -#pragma once - -namespace boost -{ - typedef unsigned long long ulong_long_type; -} - -#pragma warning(push) -#pragma warning(disable: 4091) -#pragma warning(disable: 4996) -#pragma warning(disable: 6248) -#pragma warning(disable: 6282) -#pragma warning(disable: 6285) -#pragma warning(disable: 6388) -#pragma warning(disable: 28159) -#define _SCL_SECURE_NO_WARNINGS - -#ifdef sleep -#undef sleep -#endif - -//#define BOOST_DISABLE_WIN32 -#define BOOST_USE_WINDOWS_H -#define BOOST_DATE_TIME_NO_LIB -#include - -#undef _SCL_SECURE_NO_WARNINGS -#pragma warning(pop) - -namespace Utils -{ - namespace IPC - { - class Channel - { - public: - Channel(std::string _name, int _queueSize = 100, int _bufferSize = 1024, bool _remove = false); - ~Channel(); - - bool receive(std::string* data); - void send(std::string data); - - private: - struct Header - { - bool fragmented; - unsigned short packetId; - size_t fragmentSize; - size_t totalSize; - unsigned int fragmentPart; - }; - - void enqueueMessage(std::string data); - void queueWorker(); - - bool terminateQueue; - std::condition_variable queueEvent; - std::mutex queueMutex; - std::thread queueThread; - std::queue internalQueue; - - bool remove; - std::unique_ptr queue; - std::string packet; - std::string name; - }; - - class BidirectionalChannel - { - public: - BidirectionalChannel(std::string name, bool server, int queueSize = 100, int bufferSize = 1024) : isServer(server), - channel1(name, queueSize, bufferSize, server), - channel2(name + "2", queueSize, bufferSize, server) - {} - - bool receive(std::string* data) - { - if(this->isServer) - { - return channel1.receive(data); - } - else - { - return channel2.receive(data); - } - } - - void send(std::string data) - { - if (this->isServer) - { - return channel2.send(data); - } - else - { - return channel1.send(data); - } - } - - private: - const bool isServer; - Channel channel1; - Channel channel2; - }; - } -}