[IPC] Basic IPC channel class

This commit is contained in:
momo5502 2017-01-27 17:21:10 +01:00
parent ef01b4e617
commit fc0bf1acc1
6 changed files with 200 additions and 50 deletions

View File

@ -1,22 +1,5 @@
#include "STDInclude.hpp"
namespace boost
{
typedef unsigned long long ulong_long_type;
}
#pragma warning(push)
#pragma warning(disable: 4091)
#pragma warning(disable: 4996)
#undef sleep
//#define BOOST_DISABLE_WIN32
#define BOOST_USE_WINDOWS_H
#define BOOST_DATE_TIME_NO_LIB
#include <boost/interprocess/ipc/message_queue.hpp>
#pragma warning(pop)
namespace Components
{
Pipe IPCPipe::ServerPipe;
@ -223,38 +206,6 @@ namespace Components
IPCPipe::IPCPipe()
{
Command::Add("mq1", [](Command::Params*)
{
boost::interprocess::message_queue::remove("message_queue");
//Create a message_queue.
boost::interprocess::message_queue mq(boost::interprocess::create_only, "message_queue", 100, sizeof(int));
int i = 1;
mq.send(&i, sizeof(i), 0);
});
Command::Add("mq2", [](Command::Params*)
{
boost::interprocess::message_queue mq(boost::interprocess::open_only, "message_queue");
unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;
//Receive 100 numbers
for (int i = 0; i < 3; ++i) {
int number;
if (!mq.try_receive(&number, sizeof(number), recvd_size, priority) || recvd_size != sizeof(number))
{
Logger::Print("Nothing received\n");
}
else
{
Logger::Print("Rec: %d\n", number);
}
}
});
if (Dedicated::IsEnabled()) return;
// Server pipe
@ -285,6 +236,24 @@ namespace Components
IPCPipe::Write("ping", "");
});
static Utils::IPC::Channel channel("iw4xChannel");
static Utils::IPC::Channel channel2("iw4xChannel2");
channel.send("Hello world!");
Command::Add("ipcchan", [](Command::Params* params)
{
channel.send(params->join(1));
});
QuickPatch::OnFrame([]()
{
std::string buffer;
if(channel2.receive(&buffer))
{
Logger::Print("Data received: %s\n", buffer.data());
}
});
STARTUPINFOA sInfo;
PROCESS_INFORMATION pInfo;

View File

@ -18,6 +18,11 @@
#include <d3d9.h>
#include <Aclapi.h>
#pragma warning(push)
#pragma warning(disable: 4996)
#include <xutility>
#pragma warning(pop)
#include <sstream>
#include <fstream>
#include <cctype>
@ -89,6 +94,7 @@ template <size_t S> class Sizer { };
#endif
#include "Utils/IO.hpp"
#include "Utils/IPC.hpp"
#include "Utils/CSV.hpp"
#include "Utils/Time.hpp"
#include "Utils/Cache.hpp"

104
src/Utils/IPC.cpp Normal file
View File

@ -0,0 +1,104 @@
#include "STDInclude.hpp"
namespace Utils
{
namespace IPC
{
Channel::Channel(std::string _name, int queueSize, int bufferSize) : name(_name)
{
//boost::interprocess::message_queue::remove(this->name.data());
queue.reset(new boost::interprocess::message_queue(boost::interprocess::open_or_create, this->name.data(), queueSize, bufferSize + sizeof(Channel::Header)));
}
Channel::~Channel()
{
boost::interprocess::message_queue::remove(this->name.data());
}
bool Channel::receive(std::string* data)
{
if (!data) return false;
data->clear();
if(this->packet.size() < this->queue->get_max_msg_size())
{
packet.resize(this->queue->get_max_msg_size());
}
const Channel::Header* header = reinterpret_cast<const Channel::Header*>(packet.data());
const char* buffer = reinterpret_cast<const char*>(header + 1);
unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;
if (this->queue->try_receive(const_cast<char*>(packet.data()), packet.size(), recvd_size, priority))
{
if ((recvd_size - sizeof(Channel::Header)) != header->fragmentSize || header->fragmentPart) return false;
data->append(buffer, header->fragmentSize);
if(header->fragmented)
{
Channel::Header mainHeader = *header;
unsigned int part = mainHeader.fragmentPart;
while (true)
{
this->queue->receive(const_cast<char*>(packet.data()), packet.size(), recvd_size, priority);
if (header->packetId != mainHeader.packetId || header->totalSize != mainHeader.totalSize || header->fragmentPart != (++part))
{
data->clear();
return false;
}
data->append(buffer, header->fragmentSize);
if(header->totalSize <= data->size())
{
break;
}
}
}
return true;
}
return false;
}
void Channel::send(std::string data)
{
const size_t fragmentSize = (this->queue->get_max_msg_size() - sizeof(Channel::Header)) - 1;
Channel::Header header;
header.fragmented = (data.size() + sizeof(Channel::Header)) > this->queue->get_max_msg_size();
header.packetId = static_cast<short>(timeGetTime() + rand());;
header.totalSize = data.size();
size_t sentSize = 0;
for (unsigned short i = 0; sentSize < data.size(); ++i)
{
header.fragmentPart = i;
header.fragmentSize = std::min(fragmentSize, data.size() - (fragmentSize * i));
std::string buffer;
buffer.append(reinterpret_cast<char*>(&header), sizeof(Channel::Header));
buffer.append(data.data() + sentSize, header.fragmentSize);
Channel::sendMessage(buffer);
sentSize += header.fragmentSize;
}
}
void Channel::sendMessage(std::string data)
{
if (data.size() <= this->queue->get_max_msg_size())
{
while (!this->queue->try_send(data.data(), data.size(), 0))
{
std::this_thread::sleep_for(100us);
}
}
}
}
}

55
src/Utils/IPC.hpp Normal file
View File

@ -0,0 +1,55 @@
#pragma once
namespace boost
{
typedef unsigned long long ulong_long_type;
}
#pragma warning(push)
#pragma warning(disable: 4091)
#pragma warning(disable: 4996)
#define _SCL_SECURE_NO_WARNINGS
#ifdef sleep
#undef sleep
#endif
//#define BOOST_DISABLE_WIN32
#define BOOST_USE_WINDOWS_H
#define BOOST_DATE_TIME_NO_LIB
#include <boost/interprocess/ipc/message_queue.hpp>
#undef _SCL_SECURE_NO_WARNINGS
#pragma warning(pop)
namespace Utils
{
namespace IPC
{
class Channel
{
public:
Channel(std::string _name, int queueSize = 100, int bufferSize = 20);
~Channel();
bool receive(std::string* data);
void send(std::string data);
private:
struct Header
{
bool fragmented;
unsigned short packetId;
size_t fragmentSize;
size_t totalSize;
unsigned int fragmentPart;
};
void sendMessage(std::string data);
std::unique_ptr<boost::interprocess::message_queue> queue;
std::string packet;
std::string name;
};
}
}

View File

@ -4,9 +4,18 @@
void worker(bool* terminator)
{
printf("Worker started\n");
Utils::IPC::Channel channel("iw4xChannel");
Utils::IPC::Channel channel2("iw4xChannel2");
while(!*terminator)
{
std::string buffer;
if(channel.receive(&buffer))
{
printf("Data received: %s\n", buffer.data());
channel2.send("OK " + buffer);
}
std::this_thread::sleep_for(1ms);
}
@ -51,7 +60,8 @@ int main()
if (runner.joinable()) runner.join();
printf("Worker terminated\n");
_getch();
//_getch();
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

View File

@ -18,6 +18,11 @@
#include <d3d9.h>
#include <Aclapi.h>
#pragma warning(push)
#pragma warning(disable: 4996)
#include <xutility>
#pragma warning(pop)
#include <sstream>
#include <fstream>
#include <cctype>
@ -72,6 +77,7 @@ template <size_t S> class Sizer { };
#pragma warning(pop)
#include "../Utils/IO.hpp"
#include "../Utils/IPC.hpp"
#include "../Utils/Time.hpp"
#include "../Utils/Chain.hpp"
#include "../Utils/Utils.hpp"