[IPCPipe] Use old IPC handler again
This commit is contained in:
parent
c9082f434a
commit
7d0b34da1a
@ -44,6 +44,7 @@ namespace Components
|
||||
Loader::Register(new Command());
|
||||
Loader::Register(new Console());
|
||||
Loader::Register(new Friends());
|
||||
Loader::Register(new IPCPipe());
|
||||
Loader::Register(new ModList());
|
||||
Loader::Register(new Network());
|
||||
Loader::Register(new Theatre());
|
||||
@ -68,7 +69,6 @@ namespace Components
|
||||
Loader::Register(new BitMessage());
|
||||
#endif
|
||||
Loader::Register(new FileSystem());
|
||||
Loader::Register(new IPCHandler());
|
||||
Loader::Register(new ModelSurfs());
|
||||
Loader::Register(new PlayerName());
|
||||
Loader::Register(new QuickPatch());
|
||||
|
@ -68,6 +68,7 @@ namespace Components
|
||||
#include "Modules/Party.hpp" // Destroys the order, but requires network classes :D
|
||||
#include "Modules/Logger.hpp"
|
||||
#include "Modules/Friends.hpp"
|
||||
#include "Modules/IPCPipe.hpp"
|
||||
#include "Modules/Download.hpp"
|
||||
#include "Modules/Playlist.hpp"
|
||||
#include "Modules/RawFiles.hpp"
|
||||
@ -86,7 +87,6 @@ namespace Components
|
||||
#include "Modules/Threading.hpp"
|
||||
#include "Modules/BitMessage.hpp"
|
||||
#include "Modules/FileSystem.hpp"
|
||||
#include "Modules/IPCHandler.hpp"
|
||||
#include "Modules/ModelSurfs.hpp"
|
||||
#include "Modules/PlayerName.hpp"
|
||||
#include "Modules/QuickPatch.hpp"
|
||||
|
@ -212,7 +212,7 @@ namespace Components
|
||||
ConnectProtocol::ConnectProtocol()
|
||||
{
|
||||
// IPC handler
|
||||
IPCHandler::OnClient("connect", [] (std::string data)
|
||||
IPCPipe::On("connect", [] (std::string data)
|
||||
{
|
||||
Command::Execute(Utils::String::VA("connect %s", data.data()), false);
|
||||
});
|
||||
@ -229,7 +229,7 @@ namespace Components
|
||||
{
|
||||
if (!Singleton::IsFirstInstance())
|
||||
{
|
||||
IPCHandler::SendClient("connect", ConnectProtocol::ConnectString);
|
||||
IPCPipe::Write("connect", ConnectProtocol::ConnectString);
|
||||
ExitProcess(0);
|
||||
}
|
||||
else
|
||||
|
@ -1,65 +0,0 @@
|
||||
#include "STDInclude.hpp"
|
||||
|
||||
namespace Components
|
||||
{
|
||||
std::unordered_map<std::string, IPCHandler::Callback> IPCHandler::ClientCallbacks;
|
||||
std::unique_ptr<Utils::IPC::BidirectionalChannel> IPCHandler::ClientChannel;
|
||||
|
||||
void IPCHandler::SendClient(std::string message, std::string data)
|
||||
{
|
||||
IPCHandler::InitChannel();
|
||||
|
||||
Proto::IPC::Command command;
|
||||
command.set_name(message);
|
||||
command.set_data(data);
|
||||
|
||||
IPCHandler::ClientChannel->send(command.SerializeAsString());
|
||||
}
|
||||
|
||||
void IPCHandler::OnClient(std::string message, IPCHandler::Callback callback)
|
||||
{
|
||||
IPCHandler::ClientCallbacks[message] = callback;
|
||||
}
|
||||
|
||||
void IPCHandler::InitChannel()
|
||||
{
|
||||
if (!IPCHandler::ClientChannel)
|
||||
{
|
||||
IPCHandler::ClientChannel.reset(new Utils::IPC::BidirectionalChannel("IW4x-Client-Channel", Singleton::IsFirstInstance()));
|
||||
}
|
||||
}
|
||||
|
||||
void IPCHandler::HandleClient()
|
||||
{
|
||||
IPCHandler::InitChannel();
|
||||
|
||||
std::string packet;
|
||||
if(IPCHandler::ClientChannel->receive(&packet))
|
||||
{
|
||||
Proto::IPC::Command command;
|
||||
if(command.ParseFromString(packet))
|
||||
{
|
||||
auto callback = IPCHandler::ClientCallbacks.find(command.name());
|
||||
if (callback != IPCHandler::ClientCallbacks.end())
|
||||
{
|
||||
callback->second(command.data());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
IPCHandler::IPCHandler()
|
||||
{
|
||||
if (Dedicated::IsEnabled() || ZoneBuilder::IsEnabled() || Loader::PerformingUnitTests()) return;
|
||||
|
||||
IPCHandler::InitChannel();
|
||||
|
||||
QuickPatch::OnFrame(IPCHandler::HandleClient);
|
||||
}
|
||||
|
||||
IPCHandler::~IPCHandler()
|
||||
{
|
||||
IPCHandler::ClientCallbacks.clear();
|
||||
IPCHandler::ClientChannel.release();
|
||||
}
|
||||
}
|
@ -1,27 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
namespace Components
|
||||
{
|
||||
class IPCHandler : public Component
|
||||
{
|
||||
public:
|
||||
typedef Utils::Slot<void(std::string)> Callback;
|
||||
|
||||
IPCHandler();
|
||||
~IPCHandler();
|
||||
|
||||
#if defined(DEBUG) || defined(FORCE_UNIT_TESTS)
|
||||
const char* getName() override { return "IPCHandler"; };
|
||||
#endif
|
||||
|
||||
static void SendClient(std::string message, std::string data);
|
||||
static void OnClient(std::string message, Callback callback);
|
||||
|
||||
private:
|
||||
static std::unique_ptr<Utils::IPC::BidirectionalChannel> ClientChannel;
|
||||
static std::unordered_map<std::string, Callback> ClientCallbacks;
|
||||
|
||||
static void InitChannel();
|
||||
static void HandleClient();
|
||||
};
|
||||
}
|
245
src/Components/Modules/IPCPipe.cpp
Normal file
245
src/Components/Modules/IPCPipe.cpp
Normal file
@ -0,0 +1,245 @@
|
||||
#include "STDInclude.hpp"
|
||||
|
||||
namespace Components
|
||||
{
|
||||
Pipe IPCPipe::ServerPipe;
|
||||
Pipe IPCPipe::ClientPipe;
|
||||
|
||||
#pragma region Pipe
|
||||
|
||||
Pipe::Pipe() : connectCallback(0), pipe(INVALID_HANDLE_VALUE), threadAttached(false), type(IPCTYPE_NONE), reconnectAttempt(0)
|
||||
{
|
||||
this->destroy();
|
||||
}
|
||||
|
||||
Pipe::~Pipe()
|
||||
{
|
||||
this->destroy();
|
||||
}
|
||||
|
||||
bool Pipe::connect(std::string name)
|
||||
{
|
||||
this->destroy();
|
||||
|
||||
this->type = IPCTYPE_CLIENT;
|
||||
this->setName(name);
|
||||
|
||||
this->pipe = CreateFileA(this->pipeFile, GENERIC_READ | GENERIC_WRITE, 0, nullptr, OPEN_EXISTING, 0, nullptr);
|
||||
|
||||
if (INVALID_HANDLE_VALUE == this->pipe)
|
||||
{
|
||||
Logger::Print("Failed to connect to the pipe\n");
|
||||
|
||||
if (this->reconnectAttempt < IPC_MAX_RECONNECTS)
|
||||
{
|
||||
Logger::Print("Attempting to reconnect to the pipe.\n");
|
||||
++this->reconnectAttempt;
|
||||
std::this_thread::sleep_for(500ms);
|
||||
|
||||
return this->connect(name);
|
||||
}
|
||||
else
|
||||
{
|
||||
this->destroy();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
this->reconnectAttempt = 0;
|
||||
Logger::Print("Successfully connected to the pipe\n");
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Pipe::create(std::string name)
|
||||
{
|
||||
this->destroy();
|
||||
|
||||
this->type = IPCTYPE_SERVER;
|
||||
this->setName(name);
|
||||
|
||||
this->pipe = CreateNamedPipeA(this->pipeFile, PIPE_ACCESS_DUPLEX, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, sizeof(this->packet), sizeof(this->packet), NMPWAIT_USE_DEFAULT_WAIT, nullptr);
|
||||
|
||||
if (INVALID_HANDLE_VALUE != this->pipe && this->pipe)
|
||||
{
|
||||
// Only create the thread, when not performing unit tests!
|
||||
if (!Loader::PerformingUnitTests())
|
||||
{
|
||||
this->threadAttached = true;
|
||||
this->thread = std::thread(Pipe::ReceiveThread, this);
|
||||
}
|
||||
|
||||
Logger::Print("Pipe successfully created\n");
|
||||
return true;
|
||||
}
|
||||
|
||||
Logger::Print("Failed to create the pipe\n");
|
||||
this->destroy();
|
||||
return false;
|
||||
}
|
||||
|
||||
void Pipe::onConnect(Pipe::Callback callback)
|
||||
{
|
||||
this->connectCallback = callback;
|
||||
}
|
||||
|
||||
void Pipe::setCallback(std::string command, Utils::Slot<Pipe::PacketCallback> callback)
|
||||
{
|
||||
this->packetCallbacks[command] = callback;
|
||||
}
|
||||
|
||||
bool Pipe::write(std::string command, std::string data)
|
||||
{
|
||||
if (this->type != IPCTYPE_CLIENT || this->pipe == INVALID_HANDLE_VALUE) return false;
|
||||
|
||||
Pipe::Packet _packet;
|
||||
strcpy_s(_packet.command, command.data());
|
||||
strcpy_s(_packet.buffer, data.data());
|
||||
|
||||
DWORD cbBytes;
|
||||
return (WriteFile(this->pipe, &_packet, sizeof(_packet), &cbBytes, nullptr) || GetLastError() == ERROR_IO_PENDING);
|
||||
}
|
||||
|
||||
void Pipe::destroy()
|
||||
{
|
||||
//this->Type = IPCTYPE_NONE;
|
||||
|
||||
//*this->PipeFile = 0;
|
||||
//*this->PipeName = 0;
|
||||
|
||||
if (this->pipe && INVALID_HANDLE_VALUE != this->pipe)
|
||||
{
|
||||
CancelIoEx(this->pipe, nullptr);
|
||||
//DeleteFileA(this->pipeFile);
|
||||
|
||||
if (this->type == IPCTYPE_SERVER) DisconnectNamedPipe(this->pipe);
|
||||
|
||||
CloseHandle(this->pipe);
|
||||
Logger::Print("Disconnected from the pipe.\n");
|
||||
}
|
||||
|
||||
this->pipe = nullptr;
|
||||
this->threadAttached = false;
|
||||
|
||||
if (this->thread.joinable())
|
||||
{
|
||||
Logger::Print("Terminating pipe thread...\n");
|
||||
|
||||
this->thread.join();
|
||||
|
||||
Logger::Print("Pipe thread terminated.\n");
|
||||
}
|
||||
|
||||
this->thread = std::thread();
|
||||
}
|
||||
|
||||
void Pipe::setName(std::string name)
|
||||
{
|
||||
memset(this->pipeName, 0, sizeof(this->pipeName));
|
||||
memset(this->pipeFile, 0, sizeof(this->pipeFile));
|
||||
|
||||
strncpy_s(this->pipeName, name.data(), sizeof(this->pipeName));
|
||||
sprintf_s(this->pipeFile, sizeof(this->pipeFile), "\\\\.\\Pipe\\%s", this->pipeName);
|
||||
}
|
||||
|
||||
void Pipe::ReceiveThread(Pipe* pipe)
|
||||
{
|
||||
if (!pipe || pipe->type != IPCTYPE_SERVER || pipe->pipe == INVALID_HANDLE_VALUE || !pipe->pipe) return;
|
||||
|
||||
if (ConnectNamedPipe(pipe->pipe, nullptr) == FALSE)
|
||||
{
|
||||
Logger::Print("Failed to initialize pipe reading.\n");
|
||||
return;
|
||||
}
|
||||
|
||||
Logger::Print("Client connected to the pipe\n");
|
||||
pipe->connectCallback();
|
||||
|
||||
DWORD cbBytes;
|
||||
|
||||
while (pipe->threadAttached && pipe->pipe && pipe->pipe != INVALID_HANDLE_VALUE)
|
||||
{
|
||||
BOOL bResult = ReadFile(pipe->pipe, &pipe->packet, sizeof(pipe->packet), &cbBytes, nullptr);
|
||||
|
||||
if (bResult && cbBytes)
|
||||
{
|
||||
if (pipe->packetCallbacks.find(pipe->packet.command) != pipe->packetCallbacks.end())
|
||||
{
|
||||
pipe->packetCallbacks[pipe->packet.command](pipe->packet.buffer);
|
||||
}
|
||||
}
|
||||
else if (pipe->threadAttached && pipe->pipe != INVALID_HANDLE_VALUE)
|
||||
{
|
||||
Logger::Print("Failed to read from client through pipe\n");
|
||||
|
||||
DisconnectNamedPipe(pipe->pipe);
|
||||
ConnectNamedPipe(pipe->pipe, nullptr);
|
||||
pipe->connectCallback();
|
||||
}
|
||||
|
||||
ZeroMemory(&pipe->packet, sizeof(pipe->packet));
|
||||
}
|
||||
}
|
||||
|
||||
#pragma endregion
|
||||
|
||||
// Callback to connect first instance's client pipe to the second instance's server pipe
|
||||
void IPCPipe::ConnectClient()
|
||||
{
|
||||
if (Singleton::IsFirstInstance())
|
||||
{
|
||||
IPCPipe::ClientPipe.connect(IPC_PIPE_NAME_CLIENT);
|
||||
}
|
||||
}
|
||||
|
||||
// Writes to the process on the other end of the pipe
|
||||
bool IPCPipe::Write(std::string command, std::string data)
|
||||
{
|
||||
return IPCPipe::ClientPipe.write(command, data);
|
||||
}
|
||||
|
||||
// Installs a callback for receiving commands from the process on the other end of the pipe
|
||||
void IPCPipe::On(std::string command, Utils::Slot<Pipe::PacketCallback> callback)
|
||||
{
|
||||
IPCPipe::ServerPipe.setCallback(command, callback);
|
||||
}
|
||||
|
||||
IPCPipe::IPCPipe()
|
||||
{
|
||||
if (Dedicated::IsEnabled()) return;
|
||||
|
||||
// Server pipe
|
||||
IPCPipe::ServerPipe.onConnect(IPCPipe::ConnectClient);
|
||||
IPCPipe::ServerPipe.create((Singleton::IsFirstInstance() ? IPC_PIPE_NAME_SERVER : IPC_PIPE_NAME_CLIENT));
|
||||
|
||||
// Connect second instance's client pipe to first instance's server pipe
|
||||
if (!Singleton::IsFirstInstance())
|
||||
{
|
||||
IPCPipe::ClientPipe.connect(IPC_PIPE_NAME_SERVER);
|
||||
}
|
||||
|
||||
IPCPipe::On("ping", [] (std::string data)
|
||||
{
|
||||
Logger::Print("Received ping form pipe, sending pong!\n");
|
||||
IPCPipe::Write("pong", data);
|
||||
});
|
||||
|
||||
IPCPipe::On("pong", [] (std::string data)
|
||||
{
|
||||
Logger::Print("Received pong form pipe!\n");
|
||||
});
|
||||
|
||||
// Test pipe functionality by sending pings
|
||||
Command::Add("ipcping", [] (Command::Params*)
|
||||
{
|
||||
Logger::Print("Sending ping to pipe!\n");
|
||||
IPCPipe::Write("ping", "");
|
||||
});
|
||||
}
|
||||
|
||||
void IPCPipe::preDestroy()
|
||||
{
|
||||
IPCPipe::ServerPipe.destroy();
|
||||
IPCPipe::ClientPipe.destroy();
|
||||
}
|
||||
}
|
83
src/Components/Modules/IPCPipe.hpp
Normal file
83
src/Components/Modules/IPCPipe.hpp
Normal file
@ -0,0 +1,83 @@
|
||||
#pragma once
|
||||
|
||||
#define IPC_MAX_RECONNECTS 3
|
||||
#define IPC_COMMAND_SIZE 100
|
||||
#define IPC_BUFFER_SIZE 0x2000
|
||||
|
||||
#define IPC_PIPE_NAME_SERVER "IW4x-Server"
|
||||
#define IPC_PIPE_NAME_CLIENT "IW4x-Client"
|
||||
|
||||
namespace Components
|
||||
{
|
||||
class Pipe
|
||||
{
|
||||
public:
|
||||
struct Packet
|
||||
{
|
||||
char command[IPC_COMMAND_SIZE];
|
||||
char buffer[IPC_BUFFER_SIZE];
|
||||
};
|
||||
|
||||
enum Type
|
||||
{
|
||||
IPCTYPE_NONE,
|
||||
IPCTYPE_SERVER,
|
||||
IPCTYPE_CLIENT
|
||||
};
|
||||
|
||||
typedef void(__cdecl PacketCallback)(std::string data);
|
||||
typedef void(__cdecl Callback)();
|
||||
|
||||
Pipe();
|
||||
~Pipe();
|
||||
|
||||
bool connect(std::string name);
|
||||
bool create(std::string name);
|
||||
|
||||
bool write(std::string command, std::string data);
|
||||
void setCallback(std::string command, Utils::Slot<PacketCallback> callback);
|
||||
void onConnect(Callback callback);
|
||||
|
||||
void destroy();
|
||||
|
||||
private:
|
||||
Utils::Slot<void()> connectCallback;
|
||||
std::map<std::string, Utils::Slot<PacketCallback>> packetCallbacks;
|
||||
|
||||
HANDLE pipe;
|
||||
std::thread thread;
|
||||
bool threadAttached;
|
||||
|
||||
Type type;
|
||||
Packet packet;
|
||||
|
||||
char pipeName[MAX_PATH];
|
||||
char pipeFile[MAX_PATH];
|
||||
unsigned int reconnectAttempt;
|
||||
|
||||
void setName(std::string name);
|
||||
|
||||
static void ReceiveThread(Pipe* pipe);
|
||||
};
|
||||
|
||||
class IPCPipe : public Component
|
||||
{
|
||||
public:
|
||||
IPCPipe();
|
||||
|
||||
#if defined(DEBUG) || defined(FORCE_UNIT_TESTS)
|
||||
const char* getName() override { return "IPCPipe"; };
|
||||
#endif
|
||||
|
||||
void preDestroy() override;
|
||||
|
||||
static bool Write(std::string command, std::string data);
|
||||
static void On(std::string command, Utils::Slot<Pipe::PacketCallback> callback);
|
||||
|
||||
private:
|
||||
static Pipe ServerPipe;
|
||||
static Pipe ClientPipe;
|
||||
|
||||
static void ConnectClient();
|
||||
};
|
||||
}
|
@ -19,11 +19,6 @@
|
||||
#include <Aclapi.h>
|
||||
#include <Psapi.h>
|
||||
|
||||
#pragma warning(push)
|
||||
#pragma warning(disable: 4996)
|
||||
#include <xutility>
|
||||
#pragma warning(pop)
|
||||
|
||||
#include <sstream>
|
||||
#include <fstream>
|
||||
#include <cctype>
|
||||
@ -98,7 +93,6 @@ template <size_t S> class Sizer { };
|
||||
#endif
|
||||
|
||||
#include "Utils/IO.hpp"
|
||||
#include "Utils/IPC.hpp"
|
||||
#include "Utils/CSV.hpp"
|
||||
#include "Utils/Time.hpp"
|
||||
#include "Utils/Cache.hpp"
|
||||
|
@ -1,144 +0,0 @@
|
||||
#include "STDInclude.hpp"
|
||||
|
||||
namespace Utils
|
||||
{
|
||||
namespace IPC
|
||||
{
|
||||
Channel::Channel(std::string _name, int _queueSize, int _bufferSize, bool _remove) : terminateQueue(false), remove(_remove), name(_name)
|
||||
{
|
||||
if(this->remove) boost::interprocess::message_queue::remove(this->name.data());
|
||||
queue.reset(new boost::interprocess::message_queue(boost::interprocess::open_or_create, this->name.data(), _queueSize, _bufferSize + sizeof(Channel::Header)));
|
||||
|
||||
this->queueThread = std::thread(&Channel::queueWorker, this);
|
||||
}
|
||||
|
||||
Channel::~Channel()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(this->queueMutex);
|
||||
this->terminateQueue = true;
|
||||
this->queueEvent.notify_all();
|
||||
lock.unlock();
|
||||
|
||||
if (this->queueThread.joinable())
|
||||
{
|
||||
this->queueThread.join();
|
||||
}
|
||||
|
||||
if (this->remove) boost::interprocess::message_queue::remove(this->name.data());
|
||||
}
|
||||
|
||||
bool Channel::receive(std::string* data)
|
||||
{
|
||||
if (!data) return false;
|
||||
data->clear();
|
||||
|
||||
if(this->packet.size() < this->queue->get_max_msg_size())
|
||||
{
|
||||
packet.resize(this->queue->get_max_msg_size());
|
||||
}
|
||||
|
||||
const Channel::Header* header = reinterpret_cast<const Channel::Header*>(packet.data());
|
||||
const char* buffer = reinterpret_cast<const char*>(header + 1);
|
||||
|
||||
unsigned int priority;
|
||||
boost::interprocess::message_queue::size_type recvd_size;
|
||||
|
||||
if (this->queue->try_receive(const_cast<char*>(packet.data()), packet.size(), recvd_size, priority))
|
||||
{
|
||||
if ((recvd_size - sizeof(Channel::Header)) != header->fragmentSize || header->fragmentPart) return false;
|
||||
data->append(buffer, header->fragmentSize);
|
||||
|
||||
if(header->fragmented)
|
||||
{
|
||||
Channel::Header mainHeader = *header;
|
||||
unsigned int part = mainHeader.fragmentPart;
|
||||
|
||||
while (true)
|
||||
{
|
||||
if (!this->queue->try_receive(const_cast<char*>(packet.data()), packet.size(), recvd_size, priority)) return false;
|
||||
|
||||
if (header->packetId != mainHeader.packetId || header->totalSize != mainHeader.totalSize || header->fragmentPart != (++part))
|
||||
{
|
||||
data->clear();
|
||||
return false;
|
||||
}
|
||||
|
||||
data->append(buffer, header->fragmentSize);
|
||||
|
||||
if(header->totalSize <= data->size())
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void Channel::send(std::string data)
|
||||
{
|
||||
const size_t fragmentSize = (this->queue->get_max_msg_size() - sizeof(Channel::Header)) - 1;
|
||||
|
||||
Channel::Header header;
|
||||
header.fragmented = (data.size() + sizeof(Channel::Header)) > this->queue->get_max_msg_size();
|
||||
header.packetId = static_cast<short>(timeGetTime() + rand());;
|
||||
header.totalSize = data.size();
|
||||
|
||||
size_t sentSize = 0;
|
||||
for (unsigned short i = 0; sentSize < data.size(); ++i)
|
||||
{
|
||||
header.fragmentPart = i;
|
||||
header.fragmentSize = std::min(fragmentSize, data.size() - (fragmentSize * i));
|
||||
|
||||
std::string buffer;
|
||||
buffer.append(reinterpret_cast<char*>(&header), sizeof(Channel::Header));
|
||||
buffer.append(data.data() + sentSize, header.fragmentSize);
|
||||
Channel::enqueueMessage(buffer);
|
||||
|
||||
sentSize += header.fragmentSize;
|
||||
}
|
||||
}
|
||||
|
||||
void Channel::enqueueMessage(std::string data)
|
||||
{
|
||||
if (data.size() <= this->queue->get_max_msg_size())
|
||||
{
|
||||
std::lock_guard<std::mutex> _(this->queueMutex);
|
||||
this->internalQueue.push(data);
|
||||
this->queueEvent.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
void Channel::queueWorker()
|
||||
{
|
||||
while (!this->terminateQueue)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(this->queueMutex);
|
||||
|
||||
while(!this->terminateQueue && this->internalQueue.empty())
|
||||
{
|
||||
this->queueEvent.wait(lock);
|
||||
}
|
||||
|
||||
while(!this->terminateQueue && !this->internalQueue.empty())
|
||||
{
|
||||
std::string data = this->internalQueue.front();
|
||||
this->internalQueue.pop();
|
||||
|
||||
if (data.size() <= this->queue->get_max_msg_size())
|
||||
{
|
||||
while (!this->terminateQueue && !this->queue->try_send(data.data(), data.size(), 0))
|
||||
{
|
||||
lock.unlock();
|
||||
std::this_thread::sleep_for(1000us);
|
||||
lock.lock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,106 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
namespace boost
|
||||
{
|
||||
typedef unsigned long long ulong_long_type;
|
||||
}
|
||||
|
||||
#pragma warning(push)
|
||||
#pragma warning(disable: 4091)
|
||||
#pragma warning(disable: 4996)
|
||||
#pragma warning(disable: 6248)
|
||||
#pragma warning(disable: 6282)
|
||||
#pragma warning(disable: 6285)
|
||||
#pragma warning(disable: 6388)
|
||||
#pragma warning(disable: 28159)
|
||||
#define _SCL_SECURE_NO_WARNINGS
|
||||
|
||||
#ifdef sleep
|
||||
#undef sleep
|
||||
#endif
|
||||
|
||||
//#define BOOST_DISABLE_WIN32
|
||||
#define BOOST_USE_WINDOWS_H
|
||||
#define BOOST_DATE_TIME_NO_LIB
|
||||
#include <boost/interprocess/ipc/message_queue.hpp>
|
||||
|
||||
#undef _SCL_SECURE_NO_WARNINGS
|
||||
#pragma warning(pop)
|
||||
|
||||
namespace Utils
|
||||
{
|
||||
namespace IPC
|
||||
{
|
||||
class Channel
|
||||
{
|
||||
public:
|
||||
Channel(std::string _name, int _queueSize = 100, int _bufferSize = 1024, bool _remove = false);
|
||||
~Channel();
|
||||
|
||||
bool receive(std::string* data);
|
||||
void send(std::string data);
|
||||
|
||||
private:
|
||||
struct Header
|
||||
{
|
||||
bool fragmented;
|
||||
unsigned short packetId;
|
||||
size_t fragmentSize;
|
||||
size_t totalSize;
|
||||
unsigned int fragmentPart;
|
||||
};
|
||||
|
||||
void enqueueMessage(std::string data);
|
||||
void queueWorker();
|
||||
|
||||
bool terminateQueue;
|
||||
std::condition_variable queueEvent;
|
||||
std::mutex queueMutex;
|
||||
std::thread queueThread;
|
||||
std::queue<std::string> internalQueue;
|
||||
|
||||
bool remove;
|
||||
std::unique_ptr<boost::interprocess::message_queue> queue;
|
||||
std::string packet;
|
||||
std::string name;
|
||||
};
|
||||
|
||||
class BidirectionalChannel
|
||||
{
|
||||
public:
|
||||
BidirectionalChannel(std::string name, bool server, int queueSize = 100, int bufferSize = 1024) : isServer(server),
|
||||
channel1(name, queueSize, bufferSize, server),
|
||||
channel2(name + "2", queueSize, bufferSize, server)
|
||||
{}
|
||||
|
||||
bool receive(std::string* data)
|
||||
{
|
||||
if(this->isServer)
|
||||
{
|
||||
return channel1.receive(data);
|
||||
}
|
||||
else
|
||||
{
|
||||
return channel2.receive(data);
|
||||
}
|
||||
}
|
||||
|
||||
void send(std::string data)
|
||||
{
|
||||
if (this->isServer)
|
||||
{
|
||||
return channel2.send(data);
|
||||
}
|
||||
else
|
||||
{
|
||||
return channel1.send(data);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
const bool isServer;
|
||||
Channel channel1;
|
||||
Channel channel2;
|
||||
};
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user