Add thread synchronization for nodes
This commit is contained in:
parent
63331b9bf7
commit
e29377580b
@ -2,6 +2,8 @@
|
||||
|
||||
namespace Components
|
||||
{
|
||||
std::mutex Node::NodeMutex;
|
||||
std::mutex Node::SessionMutex;
|
||||
Utils::Cryptography::ECC::Key Node::SignatureKey;
|
||||
std::vector<Node::NodeEntry> Node::Nodes;
|
||||
std::vector<Node::ClientSession> Node::Sessions;
|
||||
@ -45,6 +47,7 @@ namespace Components
|
||||
// However, defining another proto message due to this would be redundant.
|
||||
//list.set_is_dedi(Dedicated::IsDedicated());
|
||||
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
for (auto node : Node::Nodes)
|
||||
{
|
||||
if (node.state == Node::STATE_VALID && node.registered)
|
||||
@ -88,6 +91,7 @@ namespace Components
|
||||
unsigned int Node::GetValidNodeCount()
|
||||
{
|
||||
unsigned int count = 0;
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
|
||||
for (auto node : Node::Nodes)
|
||||
{
|
||||
@ -108,6 +112,7 @@ namespace Components
|
||||
if (!address.IsValid() || address.IsLocal() || address.IsSelf()) return;
|
||||
#endif
|
||||
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
Node::NodeEntry* existingEntry = Node::FindNode(address);
|
||||
if (existingEntry)
|
||||
{
|
||||
@ -142,7 +147,9 @@ namespace Components
|
||||
list.set_protocol(PROTOCOL);
|
||||
list.set_version(NODE_VERSION);
|
||||
|
||||
for (auto node : Node::Nodes)
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
|
||||
for (auto& node : Node::Nodes)
|
||||
{
|
||||
if (node.state == Node::STATE_VALID && node.registered)
|
||||
{
|
||||
@ -163,6 +170,7 @@ namespace Components
|
||||
|
||||
void Node::DeleteInvalidSessions()
|
||||
{
|
||||
std::lock_guard<std::mutex> _(Node::SessionMutex);
|
||||
for (auto i = Node::Sessions.begin(); i != Node::Sessions.end();)
|
||||
{
|
||||
if (i->lastTime <= 0 || (Game::Sys_Milliseconds() - i->lastTime) > SESSION_TIMEOUT)
|
||||
@ -178,6 +186,7 @@ namespace Components
|
||||
|
||||
void Node::DeleteInvalidNodes()
|
||||
{
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
std::vector<Node::NodeEntry> cleanNodes;
|
||||
|
||||
for (auto node : Node::Nodes)
|
||||
@ -204,6 +213,7 @@ namespace Components
|
||||
|
||||
void Node::SyncNodeList()
|
||||
{
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
for (auto& node : Node::Nodes)
|
||||
{
|
||||
if (node.state == Node::STATE_VALID && node.registered)
|
||||
@ -254,62 +264,65 @@ namespace Components
|
||||
int registerCount = 0;
|
||||
int listQueryCount = 0;
|
||||
|
||||
for (auto &node : Node::Nodes)
|
||||
{
|
||||
// TODO: Decide how to handle nodes that were already registered, but timed out re-registering.
|
||||
if (node.state == STATE_NEGOTIATING && (Game::Sys_Milliseconds() - node.lastTime) > (NODE_QUERY_TIMEOUT))
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
for (auto &node : Node::Nodes)
|
||||
{
|
||||
node.registered = false; // Definitely unregister here!
|
||||
node.state = Node::STATE_INVALID;
|
||||
node.lastHeard = Game::Sys_Milliseconds();
|
||||
node.lastTime = Game::Sys_Milliseconds();
|
||||
// TODO: Decide how to handle nodes that were already registered, but timed out re-registering.
|
||||
if (node.state == STATE_NEGOTIATING && (Game::Sys_Milliseconds() - node.lastTime) > (NODE_QUERY_TIMEOUT))
|
||||
{
|
||||
node.registered = false; // Definitely unregister here!
|
||||
node.state = Node::STATE_INVALID;
|
||||
node.lastHeard = Game::Sys_Milliseconds();
|
||||
node.lastTime = Game::Sys_Milliseconds();
|
||||
|
||||
#if defined(DEBUG) && !defined(DISABLE_NODE_LOG)
|
||||
Logger::Print("Node negotiation timed out. Invalidating %s\n", node.address.GetCString());
|
||||
Logger::Print("Node negotiation timed out. Invalidating %s\n", node.address.GetCString());
|
||||
#endif
|
||||
}
|
||||
|
||||
if (registerCount < NODE_FRAME_QUERY_LIMIT)
|
||||
{
|
||||
// Register when unregistered and in UNKNOWN state (I doubt it's possible to be unregistered and in VALID state)
|
||||
if (!node.registered && (node.state != Node::STATE_NEGOTIATING && node.state != Node::STATE_INVALID))
|
||||
{
|
||||
++registerCount;
|
||||
node.state = Node::STATE_NEGOTIATING;
|
||||
Node::PerformRegistration(node.address);
|
||||
}
|
||||
// Requery invalid nodes within the NODE_QUERY_INTERVAL
|
||||
// This is required, as a node might crash, which causes it to be invalid.
|
||||
// If it's restarted though, we wouldn't query it again.
|
||||
|
||||
// But wouldn't it send a registration request to us?
|
||||
// Not sure if the code below is necessary...
|
||||
// Well, it might be possible that this node doesn't know use anymore. Anyways, just keep that code here...
|
||||
|
||||
// Nvm, this is required for clients, as nodes don't send registration requests to clients.
|
||||
else if (node.state == STATE_INVALID && (Game::Sys_Milliseconds() - node.lastTime) > NODE_QUERY_INTERVAL)
|
||||
if (registerCount < NODE_FRAME_QUERY_LIMIT)
|
||||
{
|
||||
++registerCount;
|
||||
Node::PerformRegistration(node.address);
|
||||
}
|
||||
}
|
||||
|
||||
if (listQueryCount < NODE_FRAME_QUERY_LIMIT)
|
||||
{
|
||||
if (node.registered && node.state == Node::STATE_VALID && (!node.lastListQuery || (Game::Sys_Milliseconds() - node.lastListQuery) > NODE_QUERY_INTERVAL))
|
||||
{
|
||||
++listQueryCount;
|
||||
node.state = Node::STATE_NEGOTIATING;
|
||||
node.lastTime = Game::Sys_Milliseconds();
|
||||
node.lastListQuery = Game::Sys_Milliseconds();
|
||||
|
||||
if (Dedicated::IsEnabled())
|
||||
// Register when unregistered and in UNKNOWN state (I doubt it's possible to be unregistered and in VALID state)
|
||||
if (!node.registered && (node.state != Node::STATE_NEGOTIATING && node.state != Node::STATE_INVALID))
|
||||
{
|
||||
Network::SendCommand(node.address, "nodeListRequest");
|
||||
++registerCount;
|
||||
node.state = Node::STATE_NEGOTIATING;
|
||||
Node::PerformRegistration(node.address);
|
||||
}
|
||||
else
|
||||
// Requery invalid nodes within the NODE_QUERY_INTERVAL
|
||||
// This is required, as a node might crash, which causes it to be invalid.
|
||||
// If it's restarted though, we wouldn't query it again.
|
||||
|
||||
// But wouldn't it send a registration request to us?
|
||||
// Not sure if the code below is necessary...
|
||||
// Well, it might be possible that this node doesn't know use anymore. Anyways, just keep that code here...
|
||||
|
||||
// Nvm, this is required for clients, as nodes don't send registration requests to clients.
|
||||
else if (node.state == STATE_INVALID && (Game::Sys_Milliseconds() - node.lastTime) > NODE_QUERY_INTERVAL)
|
||||
{
|
||||
Network::SendCommand(node.address, "sessionRequest");
|
||||
++registerCount;
|
||||
Node::PerformRegistration(node.address);
|
||||
}
|
||||
}
|
||||
|
||||
if (listQueryCount < NODE_FRAME_QUERY_LIMIT)
|
||||
{
|
||||
if (node.registered && node.state == Node::STATE_VALID && (!node.lastListQuery || (Game::Sys_Milliseconds() - node.lastListQuery) > NODE_QUERY_INTERVAL))
|
||||
{
|
||||
++listQueryCount;
|
||||
node.state = Node::STATE_NEGOTIATING;
|
||||
node.lastTime = Game::Sys_Milliseconds();
|
||||
node.lastListQuery = Game::Sys_Milliseconds();
|
||||
|
||||
if (Dedicated::IsEnabled())
|
||||
{
|
||||
Network::SendCommand(node.address, "nodeListRequest");
|
||||
}
|
||||
else
|
||||
{
|
||||
Network::SendCommand(node.address, "sessionRequest");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -374,6 +387,7 @@ namespace Components
|
||||
packet.set_challenge(challenge);
|
||||
packet.set_signature(Utils::Cryptography::ECC::SignMessage(Node::SignatureKey, challenge));
|
||||
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
for (auto node : Node::Nodes)
|
||||
{
|
||||
Network::SendCommand(node.address, "nodeDeregister", packet.SerializeAsString());
|
||||
@ -386,16 +400,16 @@ namespace Components
|
||||
{
|
||||
if (Dvar::Var("sv_lanOnly").Get<bool>()) return;
|
||||
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
|
||||
// Create a new entry, if we don't already know it
|
||||
if (!entry)
|
||||
if (!Node::FindNode(address))
|
||||
{
|
||||
Node::AddNode(address);
|
||||
entry = Node::FindNode(address);
|
||||
if (!entry) return;
|
||||
if (!Node::FindNode(address)) return;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
|
||||
#if defined(DEBUG) && !defined(DISABLE_NODE_LOG)
|
||||
Logger::Print("Received registration request from %s\n", address.GetCString());
|
||||
#endif
|
||||
@ -434,6 +448,7 @@ namespace Components
|
||||
{
|
||||
if (Dvar::Var("sv_lanOnly").Get<bool>()) return;
|
||||
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
if (!entry || entry->state != Node::STATE_NEGOTIATING) return;
|
||||
|
||||
@ -485,6 +500,7 @@ namespace Components
|
||||
if (Dvar::Var("sv_lanOnly").Get<bool>()) return;
|
||||
|
||||
// Ignore requests from nodes we don't know
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
if (!entry || entry->state != Node::STATE_NEGOTIATING) return;
|
||||
|
||||
@ -527,21 +543,26 @@ namespace Components
|
||||
|
||||
// Check if this is a registered node
|
||||
bool allowed = false;
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
if (entry && entry->registered)
|
||||
{
|
||||
entry->lastTime = Game::Sys_Milliseconds();
|
||||
allowed = true;
|
||||
}
|
||||
|
||||
// Check if there is any open session
|
||||
if (!allowed)
|
||||
{
|
||||
Node::ClientSession* session = Node::FindSession(address);
|
||||
if (session)
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
if (entry && entry->registered)
|
||||
{
|
||||
session->lastTime = Game::Sys_Milliseconds();
|
||||
allowed = session->valid;
|
||||
entry->lastTime = Game::Sys_Milliseconds();
|
||||
allowed = true;
|
||||
}
|
||||
|
||||
// Check if there is any open session
|
||||
if (!allowed)
|
||||
{
|
||||
std::lock_guard<std::mutex> __(Node::SessionMutex);
|
||||
Node::ClientSession* session = Node::FindSession(address);
|
||||
if (session)
|
||||
{
|
||||
session->lastTime = Game::Sys_Milliseconds();
|
||||
allowed = session->valid;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -563,6 +584,7 @@ namespace Components
|
||||
{
|
||||
if (Dvar::Var("sv_lanOnly").Get<bool>()) return;
|
||||
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
if (!entry || !entry->registered) return;
|
||||
|
||||
@ -598,6 +620,7 @@ namespace Components
|
||||
if (Dvar::Var("sv_lanOnly").Get<bool>()) return;
|
||||
|
||||
// Search an active session, if we haven't found one, register a template
|
||||
std::lock_guard<std::mutex> _(Node::SessionMutex);
|
||||
if (!Node::FindSession(address))
|
||||
{
|
||||
Node::ClientSession templateSession;
|
||||
@ -626,6 +649,7 @@ namespace Components
|
||||
if (Dvar::Var("sv_lanOnly").Get<bool>()) return;
|
||||
|
||||
// Return if we don't have a session for this address
|
||||
std::lock_guard<std::mutex> _(Node::SessionMutex);
|
||||
Node::ClientSession* session = Node::FindSession(address);
|
||||
if (!session || session->valid) return;
|
||||
|
||||
@ -650,6 +674,7 @@ namespace Components
|
||||
{
|
||||
Network::Handle("sessionInitialize", [] (Network::Address address, std::string data)
|
||||
{
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
if (!entry) return;
|
||||
|
||||
@ -663,12 +688,15 @@ namespace Components
|
||||
|
||||
Network::Handle("sessionAcknowledge", [] (Network::Address address, std::string data)
|
||||
{
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
if (!entry) return;
|
||||
{
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
if (!entry) return;
|
||||
|
||||
entry->state = Node::STATE_VALID;
|
||||
entry->registered = true;
|
||||
entry->lastTime = Game::Sys_Milliseconds();
|
||||
entry->state = Node::STATE_VALID;
|
||||
entry->registered = true;
|
||||
entry->lastTime = Game::Sys_Milliseconds();
|
||||
}
|
||||
|
||||
#if defined(DEBUG) && !defined(DISABLE_NODE_LOG)
|
||||
Logger::Print("Session acknowledged by %s, synchronizing node list...\n", address.GetCString());
|
||||
@ -690,6 +718,7 @@ namespace Components
|
||||
return;
|
||||
}
|
||||
|
||||
Node::NodeMutex.lock();
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
if (entry)
|
||||
{
|
||||
@ -705,11 +734,21 @@ namespace Components
|
||||
entry->state = Node::STATE_VALID;
|
||||
entry->lastTime = Game::Sys_Milliseconds();
|
||||
|
||||
// Block old versions
|
||||
// if (entry->version < NODE_VERSION)
|
||||
// {
|
||||
// entry->state = Node::STATE_INVALID;
|
||||
// Node::NodeMutex.unlock();
|
||||
// return;
|
||||
// }
|
||||
|
||||
if (!Dedicated::IsEnabled() && entry->isDedi && ServerList::IsOnlineList() && entry->protocol == PROTOCOL)
|
||||
{
|
||||
ServerList::InsertRequest(entry->address, true);
|
||||
}
|
||||
|
||||
Node::NodeMutex.unlock();
|
||||
|
||||
for (int i = 0; i < list.address_size(); ++i)
|
||||
{
|
||||
Network::Address _addr(list.address(i));
|
||||
@ -730,11 +769,17 @@ namespace Components
|
||||
Node::AddNode(_addr);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Node::NodeMutex.unlock();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Node::NodeMutex.unlock();
|
||||
//Node::AddNode(address);
|
||||
|
||||
std::lock_guard<std::mutex> _(Node::SessionMutex);
|
||||
Node::ClientSession* session = Node::FindSession(address);
|
||||
if (session && session->valid)
|
||||
{
|
||||
@ -754,6 +799,7 @@ namespace Components
|
||||
{
|
||||
if (Dedicated::IsEnabled())
|
||||
{
|
||||
Node::NodeMutex.lock();
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
if (entry)
|
||||
{
|
||||
@ -761,9 +807,12 @@ namespace Components
|
||||
entry->lastTime = Game::Sys_Milliseconds();
|
||||
entry->registered = false;
|
||||
entry->state = Node::STATE_UNKNOWN;
|
||||
Node::NodeMutex.unlock();
|
||||
}
|
||||
else
|
||||
{
|
||||
Node::NodeMutex.unlock();
|
||||
|
||||
// Add as new entry to perform registration
|
||||
Node::AddNode(address);
|
||||
}
|
||||
@ -774,6 +823,7 @@ namespace Components
|
||||
{
|
||||
Logger::Print("Nodes: %d (%d)\n", Node::Nodes.size(), Node::GetValidNodeCount());
|
||||
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
for (auto node : Node::Nodes)
|
||||
{
|
||||
Logger::Print("%s\t(%s)\n", node.address.GetCString(), Node::GetStateName(node.state));
|
||||
@ -787,6 +837,7 @@ namespace Components
|
||||
Network::Address address(params[1]);
|
||||
Node::AddNode(address);
|
||||
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
Node::NodeEntry* entry = Node::FindNode(address);
|
||||
if (entry)
|
||||
{
|
||||
@ -799,6 +850,7 @@ namespace Components
|
||||
{
|
||||
Logger::Print("Re-Synchronizing nodes...\n");
|
||||
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
for (auto& node : Node::Nodes)
|
||||
{
|
||||
node.state = Node::STATE_UNKNOWN;
|
||||
@ -817,6 +869,8 @@ namespace Components
|
||||
Node::SignatureKey.Free();
|
||||
|
||||
Node::StoreNodes(true);
|
||||
std::lock_guard<std::mutex> _(Node::NodeMutex);
|
||||
std::lock_guard<std::mutex> __(Node::SessionMutex);
|
||||
Node::Nodes.clear();
|
||||
Node::Sessions.clear();
|
||||
}
|
||||
|
@ -7,7 +7,7 @@
|
||||
#define NODE_STORE_INTERVAL 1000 * 60* 1 // Store nodes every minute
|
||||
#define SESSION_TIMEOUT 1000 * 10 // 10 seconds session timeout
|
||||
|
||||
#define NODE_VERSION 1
|
||||
#define NODE_VERSION 2
|
||||
|
||||
namespace Components
|
||||
{
|
||||
@ -69,6 +69,8 @@ namespace Components
|
||||
|
||||
static Utils::Cryptography::ECC::Key SignatureKey;
|
||||
|
||||
static std::mutex NodeMutex;
|
||||
static std::mutex SessionMutex;
|
||||
static std::vector<NodeEntry> Nodes;
|
||||
static std::vector<ClientSession> Sessions;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user