[IPC] Experimental second handler

This commit is contained in:
momo5502 2017-01-28 19:21:55 +01:00
parent 20139d4a52
commit f81f820b47
2 changed files with 26 additions and 3 deletions

View File

@ -9,13 +9,16 @@ namespace Utils
if(this->remove) boost::interprocess::message_queue::remove(this->name.data()); if(this->remove) boost::interprocess::message_queue::remove(this->name.data());
queue.reset(new boost::interprocess::message_queue(boost::interprocess::open_or_create, this->name.data(), _queueSize, _bufferSize + sizeof(Channel::Header))); queue.reset(new boost::interprocess::message_queue(boost::interprocess::open_or_create, this->name.data(), _queueSize, _bufferSize + sizeof(Channel::Header)));
this->queueThread = std::thread(&Channel::queueWorker, this); this->queueThread = std::thread(&Channel::queueWorker2, this);
} }
Channel::~Channel() Channel::~Channel()
{ {
{
std::lock_guard<std::mutex> _(this->queueMutex);
this->terminateQueue = true; this->terminateQueue = true;
this->queueEvent.notify_all(); this->queueEvent.notify_all();
}
if(this->queueThread.joinable()) if(this->queueThread.joinable())
{ {
@ -110,6 +113,25 @@ namespace Utils
} }
} }
void Channel::queueWorker2()
{
while (!this->terminateQueue)
{
if(!this->internalQueue.empty())
{
std::lock_guard<std::mutex> lock(this->queueMutex);
std::string data = this->internalQueue.front();
if(this->queue->try_send(data.data(), data.size(), 0))
{
this->internalQueue.pop();
}
}
std::this_thread::sleep_for(1000us);
}
}
void Channel::queueWorker() void Channel::queueWorker()
{ {
while (!this->terminateQueue) while (!this->terminateQueue)

View File

@ -47,6 +47,7 @@ namespace Utils
void enqueueMessage(std::string data); void enqueueMessage(std::string data);
void queueWorker(); void queueWorker();
void queueWorker2();
bool terminateQueue; bool terminateQueue;
std::condition_variable queueEvent; std::condition_variable queueEvent;