From e29377580b71046469a4791d795f67afcf18a862 Mon Sep 17 00:00:00 2001 From: momo5502 Date: Sat, 17 Sep 2016 18:51:18 +0200 Subject: [PATCH] Add thread synchronization for nodes --- src/Components/Modules/Node.cpp | 190 ++++++++++++++++++++------------ src/Components/Modules/Node.hpp | 4 +- 2 files changed, 125 insertions(+), 69 deletions(-) diff --git a/src/Components/Modules/Node.cpp b/src/Components/Modules/Node.cpp index 96e7fe07..e5483048 100644 --- a/src/Components/Modules/Node.cpp +++ b/src/Components/Modules/Node.cpp @@ -2,6 +2,8 @@ namespace Components { + std::mutex Node::NodeMutex; + std::mutex Node::SessionMutex; Utils::Cryptography::ECC::Key Node::SignatureKey; std::vector Node::Nodes; std::vector Node::Sessions; @@ -45,6 +47,7 @@ namespace Components // However, defining another proto message due to this would be redundant. //list.set_is_dedi(Dedicated::IsDedicated()); + std::lock_guard _(Node::NodeMutex); for (auto node : Node::Nodes) { if (node.state == Node::STATE_VALID && node.registered) @@ -88,6 +91,7 @@ namespace Components unsigned int Node::GetValidNodeCount() { unsigned int count = 0; + std::lock_guard _(Node::NodeMutex); for (auto node : Node::Nodes) { @@ -108,6 +112,7 @@ namespace Components if (!address.IsValid() || address.IsLocal() || address.IsSelf()) return; #endif + std::lock_guard _(Node::NodeMutex); Node::NodeEntry* existingEntry = Node::FindNode(address); if (existingEntry) { @@ -142,7 +147,9 @@ namespace Components list.set_protocol(PROTOCOL); list.set_version(NODE_VERSION); - for (auto node : Node::Nodes) + std::lock_guard _(Node::NodeMutex); + + for (auto& node : Node::Nodes) { if (node.state == Node::STATE_VALID && node.registered) { @@ -163,6 +170,7 @@ namespace Components void Node::DeleteInvalidSessions() { + std::lock_guard _(Node::SessionMutex); for (auto i = Node::Sessions.begin(); i != Node::Sessions.end();) { if (i->lastTime <= 0 || (Game::Sys_Milliseconds() - i->lastTime) > SESSION_TIMEOUT) @@ -178,6 +186,7 @@ namespace Components void Node::DeleteInvalidNodes() { + std::lock_guard _(Node::NodeMutex); std::vector cleanNodes; for (auto node : Node::Nodes) @@ -204,6 +213,7 @@ namespace Components void Node::SyncNodeList() { + std::lock_guard _(Node::NodeMutex); for (auto& node : Node::Nodes) { if (node.state == Node::STATE_VALID && node.registered) @@ -254,62 +264,65 @@ namespace Components int registerCount = 0; int listQueryCount = 0; - for (auto &node : Node::Nodes) { - // TODO: Decide how to handle nodes that were already registered, but timed out re-registering. - if (node.state == STATE_NEGOTIATING && (Game::Sys_Milliseconds() - node.lastTime) > (NODE_QUERY_TIMEOUT)) + std::lock_guard _(Node::NodeMutex); + for (auto &node : Node::Nodes) { - node.registered = false; // Definitely unregister here! - node.state = Node::STATE_INVALID; - node.lastHeard = Game::Sys_Milliseconds(); - node.lastTime = Game::Sys_Milliseconds(); + // TODO: Decide how to handle nodes that were already registered, but timed out re-registering. + if (node.state == STATE_NEGOTIATING && (Game::Sys_Milliseconds() - node.lastTime) > (NODE_QUERY_TIMEOUT)) + { + node.registered = false; // Definitely unregister here! + node.state = Node::STATE_INVALID; + node.lastHeard = Game::Sys_Milliseconds(); + node.lastTime = Game::Sys_Milliseconds(); #if defined(DEBUG) && !defined(DISABLE_NODE_LOG) - Logger::Print("Node negotiation timed out. Invalidating %s\n", node.address.GetCString()); + Logger::Print("Node negotiation timed out. Invalidating %s\n", node.address.GetCString()); #endif - } - - if (registerCount < NODE_FRAME_QUERY_LIMIT) - { - // Register when unregistered and in UNKNOWN state (I doubt it's possible to be unregistered and in VALID state) - if (!node.registered && (node.state != Node::STATE_NEGOTIATING && node.state != Node::STATE_INVALID)) - { - ++registerCount; - node.state = Node::STATE_NEGOTIATING; - Node::PerformRegistration(node.address); } - // Requery invalid nodes within the NODE_QUERY_INTERVAL - // This is required, as a node might crash, which causes it to be invalid. - // If it's restarted though, we wouldn't query it again. - // But wouldn't it send a registration request to us? - // Not sure if the code below is necessary... - // Well, it might be possible that this node doesn't know use anymore. Anyways, just keep that code here... - - // Nvm, this is required for clients, as nodes don't send registration requests to clients. - else if (node.state == STATE_INVALID && (Game::Sys_Milliseconds() - node.lastTime) > NODE_QUERY_INTERVAL) + if (registerCount < NODE_FRAME_QUERY_LIMIT) { - ++registerCount; - Node::PerformRegistration(node.address); - } - } - - if (listQueryCount < NODE_FRAME_QUERY_LIMIT) - { - if (node.registered && node.state == Node::STATE_VALID && (!node.lastListQuery || (Game::Sys_Milliseconds() - node.lastListQuery) > NODE_QUERY_INTERVAL)) - { - ++listQueryCount; - node.state = Node::STATE_NEGOTIATING; - node.lastTime = Game::Sys_Milliseconds(); - node.lastListQuery = Game::Sys_Milliseconds(); - - if (Dedicated::IsEnabled()) + // Register when unregistered and in UNKNOWN state (I doubt it's possible to be unregistered and in VALID state) + if (!node.registered && (node.state != Node::STATE_NEGOTIATING && node.state != Node::STATE_INVALID)) { - Network::SendCommand(node.address, "nodeListRequest"); + ++registerCount; + node.state = Node::STATE_NEGOTIATING; + Node::PerformRegistration(node.address); } - else + // Requery invalid nodes within the NODE_QUERY_INTERVAL + // This is required, as a node might crash, which causes it to be invalid. + // If it's restarted though, we wouldn't query it again. + + // But wouldn't it send a registration request to us? + // Not sure if the code below is necessary... + // Well, it might be possible that this node doesn't know use anymore. Anyways, just keep that code here... + + // Nvm, this is required for clients, as nodes don't send registration requests to clients. + else if (node.state == STATE_INVALID && (Game::Sys_Milliseconds() - node.lastTime) > NODE_QUERY_INTERVAL) { - Network::SendCommand(node.address, "sessionRequest"); + ++registerCount; + Node::PerformRegistration(node.address); + } + } + + if (listQueryCount < NODE_FRAME_QUERY_LIMIT) + { + if (node.registered && node.state == Node::STATE_VALID && (!node.lastListQuery || (Game::Sys_Milliseconds() - node.lastListQuery) > NODE_QUERY_INTERVAL)) + { + ++listQueryCount; + node.state = Node::STATE_NEGOTIATING; + node.lastTime = Game::Sys_Milliseconds(); + node.lastListQuery = Game::Sys_Milliseconds(); + + if (Dedicated::IsEnabled()) + { + Network::SendCommand(node.address, "nodeListRequest"); + } + else + { + Network::SendCommand(node.address, "sessionRequest"); + } } } } @@ -374,6 +387,7 @@ namespace Components packet.set_challenge(challenge); packet.set_signature(Utils::Cryptography::ECC::SignMessage(Node::SignatureKey, challenge)); + std::lock_guard _(Node::NodeMutex); for (auto node : Node::Nodes) { Network::SendCommand(node.address, "nodeDeregister", packet.SerializeAsString()); @@ -386,16 +400,16 @@ namespace Components { if (Dvar::Var("sv_lanOnly").Get()) return; - Node::NodeEntry* entry = Node::FindNode(address); - // Create a new entry, if we don't already know it - if (!entry) + if (!Node::FindNode(address)) { Node::AddNode(address); - entry = Node::FindNode(address); - if (!entry) return; + if (!Node::FindNode(address)) return; } + std::lock_guard _(Node::NodeMutex); + Node::NodeEntry* entry = Node::FindNode(address); + #if defined(DEBUG) && !defined(DISABLE_NODE_LOG) Logger::Print("Received registration request from %s\n", address.GetCString()); #endif @@ -434,6 +448,7 @@ namespace Components { if (Dvar::Var("sv_lanOnly").Get()) return; + std::lock_guard _(Node::NodeMutex); Node::NodeEntry* entry = Node::FindNode(address); if (!entry || entry->state != Node::STATE_NEGOTIATING) return; @@ -485,6 +500,7 @@ namespace Components if (Dvar::Var("sv_lanOnly").Get()) return; // Ignore requests from nodes we don't know + std::lock_guard _(Node::NodeMutex); Node::NodeEntry* entry = Node::FindNode(address); if (!entry || entry->state != Node::STATE_NEGOTIATING) return; @@ -527,21 +543,26 @@ namespace Components // Check if this is a registered node bool allowed = false; - Node::NodeEntry* entry = Node::FindNode(address); - if (entry && entry->registered) - { - entry->lastTime = Game::Sys_Milliseconds(); - allowed = true; - } - // Check if there is any open session - if (!allowed) { - Node::ClientSession* session = Node::FindSession(address); - if (session) + std::lock_guard _(Node::NodeMutex); + Node::NodeEntry* entry = Node::FindNode(address); + if (entry && entry->registered) { - session->lastTime = Game::Sys_Milliseconds(); - allowed = session->valid; + entry->lastTime = Game::Sys_Milliseconds(); + allowed = true; + } + + // Check if there is any open session + if (!allowed) + { + std::lock_guard __(Node::SessionMutex); + Node::ClientSession* session = Node::FindSession(address); + if (session) + { + session->lastTime = Game::Sys_Milliseconds(); + allowed = session->valid; + } } } @@ -563,6 +584,7 @@ namespace Components { if (Dvar::Var("sv_lanOnly").Get()) return; + std::lock_guard _(Node::NodeMutex); Node::NodeEntry* entry = Node::FindNode(address); if (!entry || !entry->registered) return; @@ -598,6 +620,7 @@ namespace Components if (Dvar::Var("sv_lanOnly").Get()) return; // Search an active session, if we haven't found one, register a template + std::lock_guard _(Node::SessionMutex); if (!Node::FindSession(address)) { Node::ClientSession templateSession; @@ -626,6 +649,7 @@ namespace Components if (Dvar::Var("sv_lanOnly").Get()) return; // Return if we don't have a session for this address + std::lock_guard _(Node::SessionMutex); Node::ClientSession* session = Node::FindSession(address); if (!session || session->valid) return; @@ -650,6 +674,7 @@ namespace Components { Network::Handle("sessionInitialize", [] (Network::Address address, std::string data) { + std::lock_guard _(Node::NodeMutex); Node::NodeEntry* entry = Node::FindNode(address); if (!entry) return; @@ -663,12 +688,15 @@ namespace Components Network::Handle("sessionAcknowledge", [] (Network::Address address, std::string data) { - Node::NodeEntry* entry = Node::FindNode(address); - if (!entry) return; + { + std::lock_guard _(Node::NodeMutex); + Node::NodeEntry* entry = Node::FindNode(address); + if (!entry) return; - entry->state = Node::STATE_VALID; - entry->registered = true; - entry->lastTime = Game::Sys_Milliseconds(); + entry->state = Node::STATE_VALID; + entry->registered = true; + entry->lastTime = Game::Sys_Milliseconds(); + } #if defined(DEBUG) && !defined(DISABLE_NODE_LOG) Logger::Print("Session acknowledged by %s, synchronizing node list...\n", address.GetCString()); @@ -690,6 +718,7 @@ namespace Components return; } + Node::NodeMutex.lock(); Node::NodeEntry* entry = Node::FindNode(address); if (entry) { @@ -705,11 +734,21 @@ namespace Components entry->state = Node::STATE_VALID; entry->lastTime = Game::Sys_Milliseconds(); + // Block old versions +// if (entry->version < NODE_VERSION) +// { +// entry->state = Node::STATE_INVALID; +// Node::NodeMutex.unlock(); +// return; +// } + if (!Dedicated::IsEnabled() && entry->isDedi && ServerList::IsOnlineList() && entry->protocol == PROTOCOL) { ServerList::InsertRequest(entry->address, true); } + Node::NodeMutex.unlock(); + for (int i = 0; i < list.address_size(); ++i) { Network::Address _addr(list.address(i)); @@ -730,11 +769,17 @@ namespace Components Node::AddNode(_addr); } } + else + { + Node::NodeMutex.unlock(); + } } else { + Node::NodeMutex.unlock(); //Node::AddNode(address); + std::lock_guard _(Node::SessionMutex); Node::ClientSession* session = Node::FindSession(address); if (session && session->valid) { @@ -754,6 +799,7 @@ namespace Components { if (Dedicated::IsEnabled()) { + Node::NodeMutex.lock(); Node::NodeEntry* entry = Node::FindNode(address); if (entry) { @@ -761,9 +807,12 @@ namespace Components entry->lastTime = Game::Sys_Milliseconds(); entry->registered = false; entry->state = Node::STATE_UNKNOWN; + Node::NodeMutex.unlock(); } else { + Node::NodeMutex.unlock(); + // Add as new entry to perform registration Node::AddNode(address); } @@ -774,6 +823,7 @@ namespace Components { Logger::Print("Nodes: %d (%d)\n", Node::Nodes.size(), Node::GetValidNodeCount()); + std::lock_guard _(Node::NodeMutex); for (auto node : Node::Nodes) { Logger::Print("%s\t(%s)\n", node.address.GetCString(), Node::GetStateName(node.state)); @@ -787,6 +837,7 @@ namespace Components Network::Address address(params[1]); Node::AddNode(address); + std::lock_guard _(Node::NodeMutex); Node::NodeEntry* entry = Node::FindNode(address); if (entry) { @@ -799,6 +850,7 @@ namespace Components { Logger::Print("Re-Synchronizing nodes...\n"); + std::lock_guard _(Node::NodeMutex); for (auto& node : Node::Nodes) { node.state = Node::STATE_UNKNOWN; @@ -817,6 +869,8 @@ namespace Components Node::SignatureKey.Free(); Node::StoreNodes(true); + std::lock_guard _(Node::NodeMutex); + std::lock_guard __(Node::SessionMutex); Node::Nodes.clear(); Node::Sessions.clear(); } diff --git a/src/Components/Modules/Node.hpp b/src/Components/Modules/Node.hpp index 14c4eb24..4ccaf470 100644 --- a/src/Components/Modules/Node.hpp +++ b/src/Components/Modules/Node.hpp @@ -7,7 +7,7 @@ #define NODE_STORE_INTERVAL 1000 * 60* 1 // Store nodes every minute #define SESSION_TIMEOUT 1000 * 10 // 10 seconds session timeout -#define NODE_VERSION 1 +#define NODE_VERSION 2 namespace Components { @@ -69,6 +69,8 @@ namespace Components static Utils::Cryptography::ECC::Key SignatureKey; + static std::mutex NodeMutex; + static std::mutex SessionMutex; static std::vector Nodes; static std::vector Sessions;