[ConnectProtocol] Upgrade the protocol to the new ipc infrastructure

This commit is contained in:
momo5502 2017-01-27 23:09:32 +01:00
parent 559195c5cc
commit ee185c538f
5 changed files with 69 additions and 20 deletions

View File

@ -212,10 +212,10 @@ namespace Components
ConnectProtocol::ConnectProtocol()
{
// IPC handler
// IPCPipe::On("connect", [] (std::string data)
// {
// Command::Execute(Utils::String::VA("connect %s", data.data()), false);
// });
IPCHandler::OnClient("connect", [] (std::string data)
{
Command::Execute(Utils::String::VA("connect %s", data.data()), false);
});
// Invocation handler
QuickPatch::Once(ConnectProtocol::Invocation);
@ -229,7 +229,7 @@ namespace Components
{
if (!Singleton::IsFirstInstance())
{
//IPCPipe::Write("connect", ConnectProtocol::ConnectString);
IPCHandler::SendClient("connect", ConnectProtocol::ConnectString);
ExitProcess(0);
}
else

View File

@ -12,12 +12,22 @@ namespace Components
{
IPCHandler::InitChannels();
Proto::IPC::Command command;
command.set_command(message);
command.set_data(data);
IPCHandler::WorkerChannel->send(command.SerializeAsString());
}
void IPCHandler::SendClient(std::string message, std::string data)
{
IPCHandler::InitChannels();
//IPCHandler::ClientChannel->send()
Proto::IPC::Command command;
command.set_command(message);
command.set_data(data);
IPCHandler::ClientChannel->send(command.SerializeAsString());
}
void IPCHandler::OnWorker(std::string message, IPCHandler::Callback callback)
@ -58,6 +68,44 @@ namespace Components
if (pInfo.hProcess && pInfo.hProcess != INVALID_HANDLE_VALUE) CloseHandle(pInfo.hProcess);
}
void IPCHandler::HandleClient()
{
IPCHandler::InitChannels();
std::string packet;
if(IPCHandler::ClientChannel->receive(&packet))
{
Proto::IPC::Command command;
if(command.ParseFromString(packet))
{
auto callback = IPCHandler::ClientCallbacks.find(command.command());
if (callback != IPCHandler::ClientCallbacks.end())
{
callback->second(command.data());
}
}
}
}
void IPCHandler::HandleWorker()
{
IPCHandler::InitChannels();
std::string packet;
if (IPCHandler::WorkerChannel->receive(&packet))
{
Proto::IPC::Command command;
if (command.ParseFromString(packet))
{
auto callback = IPCHandler::WorkerCallbacks.find(command.command());
if (callback != IPCHandler::WorkerCallbacks.end())
{
callback->second(command.data());
}
}
}
}
IPCHandler::IPCHandler()
{
if (Dedicated::IsEnabled()) return;
@ -67,11 +115,8 @@ namespace Components
QuickPatch::OnFrame([]()
{
std::string buffer;
if(IPCHandler::WorkerChannel->receive(&buffer))
{
Logger::Print("Data received: %s\n", buffer.data());
}
IPCHandler::HandleWorker();
IPCHandler::HandleClient();
});
}

View File

@ -14,11 +14,11 @@ namespace Components
const char* getName() override { return "IPCHandler"; };
#endif
void SendWorker(std::string message, std::string data);
void SendClient(std::string message, std::string data);
static void SendWorker(std::string message, std::string data);
static void SendClient(std::string message, std::string data);
void OnWorker(std::string message, Callback callback);
void OnClient(std::string message, Callback callback);
static void OnWorker(std::string message, Callback callback);
static void OnClient(std::string message, Callback callback);
private:
static std::unique_ptr<Utils::IPC::BidirectionalChannel> WorkerChannel;
@ -29,5 +29,8 @@ namespace Components
static void InitChannels();
static void StartWorker();
static void HandleClient();
static void HandleWorker();
};
}

View File

@ -4,15 +4,15 @@ namespace Utils
{
namespace IPC
{
Channel::Channel(std::string _name, int queueSize, int bufferSize, bool creator) : name(_name)
Channel::Channel(std::string _name, int _queueSize, int _bufferSize, bool _remove) : name(_name), remove(_remove)
{
if(creator) boost::interprocess::message_queue::remove(this->name.data());
queue.reset(new boost::interprocess::message_queue(boost::interprocess::open_or_create, this->name.data(), queueSize, bufferSize + sizeof(Channel::Header)));
if(this->remove) boost::interprocess::message_queue::remove(this->name.data());
queue.reset(new boost::interprocess::message_queue(boost::interprocess::open_or_create, this->name.data(), _queueSize, _bufferSize + sizeof(Channel::Header)));
}
Channel::~Channel()
{
boost::interprocess::message_queue::remove(this->name.data());
if (this->remove) boost::interprocess::message_queue::remove(this->name.data());
}
bool Channel::receive(std::string* data)

View File

@ -29,7 +29,7 @@ namespace Utils
class Channel
{
public:
Channel(std::string _name, int queueSize = 100, int bufferSize = 1024, bool creator = false);
Channel(std::string _name, int _queueSize = 100, int _bufferSize = 1024, bool _remove = false);
~Channel();
bool receive(std::string* data);
@ -47,6 +47,7 @@ namespace Utils
void sendMessage(std::string data);
bool remove;
std::unique_ptr<boost::interprocess::message_queue> queue;
std::string packet;
std::string name;