[Node] Update protocol to version 4

With better thread synchronization!
This commit is contained in:
momo5502 2017-01-14 11:33:27 +01:00
parent 8622dbedc7
commit ef3fb631d5
2 changed files with 53 additions and 66 deletions

View File

@ -2,7 +2,7 @@
namespace Components namespace Components
{ {
std::mutex Node::NodeMutex; std::recursive_mutex Node::NodeMutex;
std::mutex Node::SessionMutex; std::mutex Node::SessionMutex;
Utils::Cryptography::ECC::Key Node::SignatureKey; Utils::Cryptography::ECC::Key Node::SignatureKey;
std::vector<Node::NodeEntry> Node::Nodes; std::vector<Node::NodeEntry> Node::Nodes;
@ -61,7 +61,7 @@ namespace Components
// However, defining another proto message due to this would be redundant. // However, defining another proto message due to this would be redundant.
//list.set_is_dedi(Dedicated::IsDedicated()); //list.set_is_dedi(Dedicated::IsDedicated());
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
for (auto node : Node::Nodes) for (auto node : Node::Nodes)
{ {
if (node.state == Node::STATE_VALID && node.registered) if (node.state == Node::STATE_VALID && node.registered)
@ -75,6 +75,8 @@ namespace Components
Node::NodeEntry* Node::FindNode(Network::Address address) Node::NodeEntry* Node::FindNode(Network::Address address)
{ {
std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
for (auto i = Node::Nodes.begin(); i != Node::Nodes.end(); ++i) for (auto i = Node::Nodes.begin(); i != Node::Nodes.end(); ++i)
{ {
if (i->address == address) if (i->address == address)
@ -101,7 +103,7 @@ namespace Components
unsigned int Node::GetValidNodeCount() unsigned int Node::GetValidNodeCount()
{ {
unsigned int count = 0; unsigned int count = 0;
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
for (auto node : Node::Nodes) for (auto node : Node::Nodes)
{ {
@ -122,7 +124,7 @@ namespace Components
if (!address.isValid() || address.isLocal() || address.isSelf()) return; if (!address.isValid() || address.isLocal() || address.isSelf()) return;
#endif #endif
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
Node::NodeEntry* existingEntry = Node::FindNode(address); Node::NodeEntry* existingEntry = Node::FindNode(address);
if (existingEntry) if (existingEntry)
{ {
@ -157,7 +159,7 @@ namespace Components
list.set_protocol(PROTOCOL); list.set_protocol(PROTOCOL);
list.set_version(NODE_VERSION); list.set_version(NODE_VERSION);
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
for (auto& node : Node::Nodes) for (auto& node : Node::Nodes)
{ {
@ -196,7 +198,7 @@ namespace Components
void Node::DeleteInvalidNodes() void Node::DeleteInvalidNodes()
{ {
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
std::vector<Node::NodeEntry> cleanNodes; std::vector<Node::NodeEntry> cleanNodes;
for (auto node : Node::Nodes) for (auto node : Node::Nodes)
@ -223,7 +225,7 @@ namespace Components
void Node::SyncNodeList() void Node::SyncNodeList()
{ {
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
for (auto& node : Node::Nodes) for (auto& node : Node::Nodes)
{ {
if (node.state == Node::STATE_VALID && node.registered) if (node.state == Node::STATE_VALID && node.registered)
@ -275,7 +277,7 @@ namespace Components
int listQueryCount = 0; int listQueryCount = 0;
{ {
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
for (auto &node : Node::Nodes) for (auto &node : Node::Nodes)
{ {
// TODO: Decide how to handle nodes that were already registered, but timed out re-registering. // TODO: Decide how to handle nodes that were already registered, but timed out re-registering.
@ -397,7 +399,7 @@ namespace Components
packet.set_challenge(challenge); packet.set_challenge(challenge);
packet.set_signature(Utils::Cryptography::ECC::SignMessage(Node::SignatureKey, challenge)); packet.set_signature(Utils::Cryptography::ECC::SignMessage(Node::SignatureKey, challenge));
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
for (auto node : Node::Nodes) for (auto node : Node::Nodes)
{ {
Network::SendCommand(node.address, "nodeDeregister", packet.SerializeAsString()); Network::SendCommand(node.address, "nodeDeregister", packet.SerializeAsString());
@ -417,7 +419,7 @@ namespace Components
if (!Node::FindNode(address)) return; if (!Node::FindNode(address)) return;
} }
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
Node::NodeEntry* entry = Node::FindNode(address); Node::NodeEntry* entry = Node::FindNode(address);
#if defined(DEBUG) && !defined(DISABLE_NODE_LOG) #if defined(DEBUG) && !defined(DISABLE_NODE_LOG)
@ -458,7 +460,7 @@ namespace Components
{ {
if (Dvar::Var("sv_lanOnly").get<bool>()) return; if (Dvar::Var("sv_lanOnly").get<bool>()) return;
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
Node::NodeEntry* entry = Node::FindNode(address); Node::NodeEntry* entry = Node::FindNode(address);
if (!entry || entry->state != Node::STATE_NEGOTIATING) return; if (!entry || entry->state != Node::STATE_NEGOTIATING) return;
@ -519,7 +521,7 @@ namespace Components
if (Dvar::Var("sv_lanOnly").get<bool>()) return; if (Dvar::Var("sv_lanOnly").get<bool>()) return;
// Ignore requests from nodes we don't know // Ignore requests from nodes we don't know
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
Node::NodeEntry* entry = Node::FindNode(address); Node::NodeEntry* entry = Node::FindNode(address);
if (!entry || entry->state != Node::STATE_NEGOTIATING) return; if (!entry || entry->state != Node::STATE_NEGOTIATING) return;
@ -556,32 +558,30 @@ namespace Components
} }
}); });
Network::Handle("nodeListRequest", [] (Network::Address address, std::string data) Network::Handle("nodeListRequest", [](Network::Address address, std::string data)
{ {
if (Dvar::Var("sv_lanOnly").get<bool>()) return; if (Dvar::Var("sv_lanOnly").get<bool>()) return;
// Check if this is a registered node // Check if this is a registered node
bool allowed = false; bool allowed = false;
std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
Node::NodeEntry* entry = Node::FindNode(address);
if (entry && entry->registered)
{ {
std::lock_guard<std::mutex> _(Node::NodeMutex); entry->lastTime = Game::Sys_Milliseconds();
Node::NodeEntry* entry = Node::FindNode(address); allowed = true;
if (entry && entry->registered) }
{
entry->lastTime = Game::Sys_Milliseconds();
allowed = true;
}
// Check if there is any open session // Check if there is any open session
if (!allowed) if (!allowed)
{
std::lock_guard<std::mutex> __(Node::SessionMutex);
Node::ClientSession* session = Node::FindSession(address);
if (session)
{ {
std::lock_guard<std::mutex> __(Node::SessionMutex); session->lastTime = Game::Sys_Milliseconds();
Node::ClientSession* session = Node::FindSession(address); allowed = session->valid;
if (session)
{
session->lastTime = Game::Sys_Milliseconds();
allowed = session->valid;
}
} }
} }
@ -603,7 +603,7 @@ namespace Components
{ {
if (Dvar::Var("sv_lanOnly").get<bool>()) return; if (Dvar::Var("sv_lanOnly").get<bool>()) return;
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
Node::NodeEntry* entry = Node::FindNode(address); Node::NodeEntry* entry = Node::FindNode(address);
if (!entry || !entry->registered) return; if (!entry || !entry->registered) return;
@ -691,9 +691,9 @@ namespace Components
} }
else else
{ {
Network::Handle("sessionInitialize", [] (Network::Address address, std::string data) Network::Handle("sessionInitialize", [](Network::Address address, std::string data)
{ {
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
Node::NodeEntry* entry = Node::FindNode(address); Node::NodeEntry* entry = Node::FindNode(address);
if (!entry) return; if (!entry) return;
@ -705,17 +705,15 @@ namespace Components
Network::SendCommand(address, "sessionSynchronize", data); Network::SendCommand(address, "sessionSynchronize", data);
}); });
Network::Handle("sessionAcknowledge", [] (Network::Address address, std::string data) Network::Handle("sessionAcknowledge", [](Network::Address address, std::string data)
{ {
{ std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
std::lock_guard<std::mutex> _(Node::NodeMutex); Node::NodeEntry* entry = Node::FindNode(address);
Node::NodeEntry* entry = Node::FindNode(address); if (!entry) return;
if (!entry) return;
entry->state = Node::STATE_VALID; entry->state = Node::STATE_VALID;
entry->registered = true; entry->registered = true;
entry->lastTime = Game::Sys_Milliseconds(); entry->lastTime = Game::Sys_Milliseconds();
}
#if defined(DEBUG) && !defined(DISABLE_NODE_LOG) #if defined(DEBUG) && !defined(DISABLE_NODE_LOG)
Logger::Print("Session acknowledged by %s, synchronizing node list...\n", address.getCString()); Logger::Print("Session acknowledged by %s, synchronizing node list...\n", address.getCString());
@ -728,6 +726,7 @@ namespace Components
Network::Handle("nodeListResponse", [] (Network::Address address, std::string data) Network::Handle("nodeListResponse", [] (Network::Address address, std::string data)
{ {
Proto::Node::List list; Proto::Node::List list;
std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
if (data.empty() || !list.ParseFromString(data)) if (data.empty() || !list.ParseFromString(data))
{ {
@ -737,7 +736,6 @@ namespace Components
return; return;
} }
Node::NodeMutex.lock();
Node::NodeEntry* entry = Node::FindNode(address); Node::NodeEntry* entry = Node::FindNode(address);
if (entry) if (entry)
{ {
@ -758,7 +756,6 @@ namespace Components
if (entry->version < NODE_VERSION) if (entry->version < NODE_VERSION)
{ {
entry->state = Node::STATE_INVALID; entry->state = Node::STATE_INVALID;
Node::NodeMutex.unlock();
return; return;
} }
#endif #endif
@ -768,8 +765,6 @@ namespace Components
ServerList::InsertRequest(entry->address, true); ServerList::InsertRequest(entry->address, true);
} }
Node::NodeMutex.unlock();
for (int i = 0; i < list.address_size(); ++i) for (int i = 0; i < list.address_size(); ++i)
{ {
Network::Address _addr(list.address(i)); Network::Address _addr(list.address(i));
@ -790,17 +785,10 @@ namespace Components
Node::AddNode(_addr); Node::AddNode(_addr);
} }
} }
else
{
Node::NodeMutex.unlock();
}
} }
else else
{ {
Node::NodeMutex.unlock();
//Node::AddNode(address); //Node::AddNode(address);
std::lock_guard<std::mutex> _(Node::SessionMutex);
Node::ClientSession* session = Node::FindSession(address); Node::ClientSession* session = Node::FindSession(address);
if (session && session->valid) if (session && session->valid)
{ {
@ -820,7 +808,7 @@ namespace Components
{ {
if (Dedicated::IsEnabled()) if (Dedicated::IsEnabled())
{ {
Node::NodeMutex.lock(); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
Node::NodeEntry* entry = Node::FindNode(address); Node::NodeEntry* entry = Node::FindNode(address);
if (entry) if (entry)
{ {
@ -828,12 +816,9 @@ namespace Components
entry->lastTime = Game::Sys_Milliseconds(); entry->lastTime = Game::Sys_Milliseconds();
entry->registered = false; entry->registered = false;
entry->state = Node::STATE_UNKNOWN; entry->state = Node::STATE_UNKNOWN;
Node::NodeMutex.unlock();
} }
else else
{ {
Node::NodeMutex.unlock();
// Add as new entry to perform registration // Add as new entry to perform registration
Node::AddNode(address); Node::AddNode(address);
} }
@ -844,7 +829,7 @@ namespace Components
{ {
Logger::Print("Nodes: %d (%d)\n", Node::Nodes.size(), Node::GetValidNodeCount()); Logger::Print("Nodes: %d (%d)\n", Node::Nodes.size(), Node::GetValidNodeCount());
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
for (auto node : Node::Nodes) for (auto node : Node::Nodes)
{ {
Logger::Print("%s\t(%s)\n", node.address.getCString(), Node::GetStateName(node.state)); Logger::Print("%s\t(%s)\n", node.address.getCString(), Node::GetStateName(node.state));
@ -858,7 +843,7 @@ namespace Components
Network::Address address(params->get(1)); Network::Address address(params->get(1));
Node::AddNode(address); Node::AddNode(address);
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
Node::NodeEntry* entry = Node::FindNode(address); Node::NodeEntry* entry = Node::FindNode(address);
if (entry) if (entry)
{ {
@ -873,7 +858,7 @@ namespace Components
Node::LoadNodeRemotePreset(); Node::LoadNodeRemotePreset();
std::lock_guard<std::mutex> _(Node::NodeMutex); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
for (auto& node : Node::Nodes) for (auto& node : Node::Nodes)
{ {
node.state = Node::STATE_UNKNOWN; node.state = Node::STATE_UNKNOWN;
@ -898,12 +883,14 @@ namespace Components
Node::~Node() Node::~Node()
{ {
Node::SignatureKey.free(); Node::SignatureKey.free();
Node::StoreNodes(true); Node::StoreNodes(true);
std::lock_guard<std::mutex> _(Node::NodeMutex);
std::lock_guard<std::mutex> __(Node::SessionMutex); {
Node::Nodes.clear(); std::lock_guard<std::recursive_mutex> _(Node::NodeMutex);
Node::Sessions.clear(); std::lock_guard<std::mutex> __(Node::SessionMutex);
Node::Nodes.clear();
Node::Sessions.clear();
}
} }
bool Node::unitTest() bool Node::unitTest()

View File

@ -7,7 +7,7 @@
#define NODE_STORE_INTERVAL 1000 * 60* 1 // Store nodes every minute #define NODE_STORE_INTERVAL 1000 * 60* 1 // Store nodes every minute
#define SESSION_TIMEOUT 1000 * 10 // 10 seconds session timeout #define SESSION_TIMEOUT 1000 * 10 // 10 seconds session timeout
#define NODE_VERSION 3 #define NODE_VERSION 4
namespace Components namespace Components
{ {
@ -71,7 +71,7 @@ namespace Components
static Utils::Cryptography::ECC::Key SignatureKey; static Utils::Cryptography::ECC::Key SignatureKey;
static std::mutex NodeMutex; static std::recursive_mutex NodeMutex;
static std::mutex SessionMutex; static std::mutex SessionMutex;
static std::vector<NodeEntry> Nodes; static std::vector<NodeEntry> Nodes;
static std::vector<ClientSession> Sessions; static std::vector<ClientSession> Sessions;