From c85538eb239ddc57c6f7d9637e394183e47fa293 Mon Sep 17 00:00:00 2001 From: Diavolo Date: Sat, 17 Jun 2023 11:46:20 +0200 Subject: [PATCH] feature(nodes): migrate to JSON node list --- src/Components/Modules/Chat.cpp | 2 +- src/Components/Modules/Node.cpp | 188 ++++++++++++++++++++++---------- src/Components/Modules/Node.hpp | 4 +- 3 files changed, 132 insertions(+), 62 deletions(-) diff --git a/src/Components/Modules/Chat.cpp b/src/Components/Modules/Chat.cpp index 7e0b0290..fe659ec8 100644 --- a/src/Components/Modules/Chat.cpp +++ b/src/Components/Modules/Chat.cpp @@ -371,7 +371,7 @@ namespace Components MutedList.access([&](muteList& clients) { const nlohmann::json::array_t arr = list; - for (auto& entry : arr) + for (const auto& entry : arr) { if (entry.is_number_unsigned()) { diff --git a/src/Components/Modules/Node.cpp b/src/Components/Modules/Node.cpp index b922e249..7d55671f 100644 --- a/src/Components/Modules/Node.cpp +++ b/src/Components/Modules/Node.cpp @@ -48,7 +48,7 @@ namespace Components this->lastRequest->update(); Session::Send(this->address, "nodeListRequest"); - Node::SendList(this->address); + SendList(this->address); #ifdef NODE_SYSTEM_DEBUG Logger::Debug("Sent request to {}", this->address.getString()); #endif @@ -56,7 +56,6 @@ namespace Components void Node::Entry::reset() { - // this->lastResponse.reset(); // This would invalidate the node, but maybe we don't want that? this->lastRequest.reset(); } @@ -69,28 +68,54 @@ namespace Components for (auto i = 0; i < list.nodes_size(); ++i) { - const std::string& addr = list.nodes(i); - + const auto& addr = list.nodes(i); if (addr.size() == sizeof(sockaddr)) { - Node::Add(reinterpret_cast(const_cast(addr.data()))); + Add(reinterpret_cast(const_cast(addr.data()))); } } } void Node::LoadNodes() { - Proto::Node::List list; - std::string nodes = Utils::IO::ReadFile("players/nodes.dat"); - if (nodes.empty() || !list.ParseFromString(Utils::Compression::ZLib::Decompress(nodes))) return; - - for (int i = 0; i < list.nodes_size(); ++i) + std::string data; + if (!Utils::IO::ReadFile("players/nodes.json", &data) || data.empty()) { - const std::string& addr = list.nodes(i); + return; + } - if (addr.size() == sizeof(sockaddr)) + nlohmann::json nodes; + try + { + nodes = nlohmann::json::parse(data); + } + catch (const std::exception& ex) + { + Logger::PrintError(Game::CON_CHANNEL_ERROR, "JSON Parse Error: {}\n", ex.what()); + return; + } + + if (!nodes.contains("nodes")) + { + Logger::PrintError(Game::CON_CHANNEL_ERROR, "nodes.json contains invalid data\n"); + return; + } + + const auto& list = nodes["nodes"]; + if (!list.is_array()) + { + return; + } + + const nlohmann::json::array_t arr = list; + Logger::Print("Parsing {} nodes from nodes.json\n", arr.size()); + + for (const auto& entry : arr) + { + if (entry.is_string()) { - Node::Add(reinterpret_cast(const_cast(addr.data()))); + Network::Address address(entry.get()); + Add(address); } } } @@ -99,26 +124,29 @@ namespace Components { if (Dedicated::IsEnabled() && Dedicated::SVLanOnly.get()) return; + std::vector nodes; + static Utils::Time::Interval interval; if (!force && !interval.elapsed(1min)) return; interval.update(); - Proto::Node::List list; + Mutex.lock(); - Node::Mutex.lock(); - for (auto& node : Node::Nodes) + for (auto& node : Nodes) { - if (node.isValid()) + if (node.isValid() || force) { - std::string* str = list.add_nodes(); - - sockaddr addr = node.address.getSockAddr(); - str->append(reinterpret_cast(&addr), sizeof(addr)); + const auto address = node.address.getString(); + nodes.emplace_back(address); } } - Node::Mutex.unlock(); - Utils::IO::WriteFile("players/nodes.dat", Utils::Compression::ZLib::Compress(list.SerializeAsString())); + Mutex.unlock(); + + nlohmann::json out; + out["nodes"] = nodes; + + Utils::IO::WriteFile("players/nodes.json", out.dump()); } void Node::Add(Network::Address address) @@ -129,23 +157,23 @@ namespace Components if (!address.isValid()) return; - std::lock_guard _(Node::Mutex); - for (auto& session : Node::Nodes) + std::lock_guard _(Mutex); + for (auto& session : Nodes) { if (session.address == address) return; } - Node::Entry node; + Entry node; node.address = address; - Node::Nodes.push_back(node); + Nodes.push_back(node); } std::vector Node::GetNodes() { - std::lock_guard _(Node::Mutex); + std::lock_guard _(Mutex); - return Node::Nodes; + return Nodes; } void Node::RunFrame() @@ -165,7 +193,7 @@ namespace Components if (WasIngame) // our last frame we were in-game and now we aren't so touch all nodes { - for (auto i = Node::Nodes.begin(); i != Node::Nodes.end();++i) + for (auto i = Nodes.begin(); i != Nodes.end();++i) { // clearing the last request and response times makes the // dispatcher think its a new node and will force a refresh @@ -181,15 +209,15 @@ namespace Components if (!frameLimit.elapsed(std::chrono::milliseconds(interval))) return; frameLimit.update(); - std::lock_guard _(Node::Mutex); + std::lock_guard _(Mutex); Dvar::Var queryLimit("net_serverQueryLimit"); int sentRequests = 0; - for (auto i = Node::Nodes.begin(); i != Node::Nodes.end();) + for (auto i = Nodes.begin(); i != Nodes.end();) { if (i->isDead()) { - i = Node::Nodes.erase(i); + i = Nodes.erase(i); continue; } if (sentRequests < queryLimit.get() && i->requiresRequest()) @@ -204,10 +232,9 @@ namespace Components void Node::Synchronize() { - std::lock_guard _(Node::Mutex); - for (auto& node : Node::Nodes) + std::lock_guard _(Mutex); + for (auto& node : Nodes) { - //if (node.isValid()) // Comment out to simulate 'syncnodes' behaviour { node.reset(); } @@ -223,7 +250,7 @@ namespace Components Logger::Debug("Received response from {}", address.getString()); #endif - std::lock_guard _(Node::Mutex); + std::lock_guard _(Mutex); for (int i = 0; i < list.nodes_size(); ++i) { @@ -231,7 +258,7 @@ namespace Components if (addr.size() == sizeof(sockaddr)) { - Node::Add(reinterpret_cast(const_cast(addr.data()))); + Add(reinterpret_cast(const_cast(addr.data()))); } } @@ -251,7 +278,7 @@ namespace Components #endif } - for (auto& node : Node::Nodes) + for (auto& node : Nodes) { if (address == node.address) { @@ -263,35 +290,37 @@ namespace Components } } - Node::Entry entry; + Entry entry; entry.address = address; entry.data.protocol = list.protocol(); entry.lastResponse.emplace(Utils::Time::Point()); - Node::Nodes.push_back(entry); + Nodes.push_back(entry); } } void Node::SendList(const Network::Address& address) { - std::lock_guard _(Node::Mutex); + std::lock_guard _(Mutex); // need to keep the message size below 1404 bytes else recipient will just drop it std::vector nodeListReponseMessages; - for (std::size_t curNode = 0; curNode < Node::Nodes.size();) + for (std::size_t curNode = 0; curNode < Nodes.size();) { Proto::Node::List list; list.set_isnode(Dedicated::IsEnabled()); list.set_protocol(PROTOCOL); - list.set_port(Node::GetPort()); + list.set_port(GetPort()); for (std::size_t i = 0; i < NODE_MAX_NODES_TO_SEND;) { - if (curNode >= Node::Nodes.size()) + if (curNode >= Nodes.size()) + { break; + } - auto node = Node::Nodes.at(curNode++); + auto node = Nodes.at(curNode++); if (node.isValid()) { @@ -326,6 +355,44 @@ namespace Components return Network::GetPort(); } + void Node::Migrate() + { + Proto::Node::List list; + std::string nodes; + + if (!Utils::IO::ReadFile("players/nodes.dat", &nodes) || nodes.empty()) + { + return; + } + + if (!list.ParseFromString(Utils::Compression::ZLib::Decompress(nodes))) + { + return; + } + + std::vector data; + for (auto i = 0; i < list.nodes_size(); ++i) + { + const std::string& addr = list.nodes(i); + + if (addr.size() == sizeof(sockaddr)) + { + Network::Address address(reinterpret_cast(const_cast(addr.data()))); + data.emplace_back(address.getString()); + } + } + + nlohmann::json out; + out["nodes"] = data; + + if (!Utils::IO::FileExists("players/nodes.json")) + { + Utils::IO::WriteFile("players/nodes.json", out.dump()); + } + + Utils::IO::RemoveFile("players/nodes.dat"); + } + Node::Node() { if (ZoneBuilder::IsEnabled()) return; @@ -333,32 +400,33 @@ namespace Components Scheduler::Loop([] { - Node::StoreNodes(false); + StoreNodes(false); }, Scheduler::Pipeline::ASYNC); - Scheduler::Loop(Node::RunFrame, Scheduler::Pipeline::MAIN); + Scheduler::Loop(RunFrame, Scheduler::Pipeline::MAIN); - Session::Handle("nodeListResponse", Node::HandleResponse); + Session::Handle("nodeListResponse", HandleResponse); Session::Handle("nodeListRequest", [](const Network::Address& address, [[maybe_unused]] const std::string& data) { - Node::SendList(address); + SendList(address); }); // Load stored nodes auto loadNodes = [] { - Node::LoadNodePreset(); - Node::LoadNodes(); + Migrate(); + LoadNodePreset(); + LoadNodes(); }; Scheduler::OnGameInitialized(loadNodes, Scheduler::Pipeline::MAIN); Command::Add("listNodes", [](const Command::Params*) { - Logger::Print("Nodes: {}\n", Node::Nodes.size()); + Logger::Print("Nodes: {}\n", Nodes.size()); - std::lock_guard _(Node::Mutex); - for (const auto& node : Node::Nodes) + std::lock_guard _(Mutex); + for (const auto& node : Nodes) { Logger::Print("{}\t({})\n", node.address.getString(), node.isValid() ? "Valid" : "Invalid"); } @@ -370,15 +438,15 @@ namespace Components auto address = Network::Address{ params->get(1) }; if (address.isValid()) { - Node::Add(address); + Add(address); } }); } - Node::~Node() + void Node::preDestroy() { - std::lock_guard _(Node::Mutex); - Node::StoreNodes(true); - Node::Nodes.clear(); + std::lock_guard _(Mutex); + StoreNodes(true); + Nodes.clear(); } } diff --git a/src/Components/Modules/Node.hpp b/src/Components/Modules/Node.hpp index b585df3f..d76cdd20 100644 --- a/src/Components/Modules/Node.hpp +++ b/src/Components/Modules/Node.hpp @@ -34,7 +34,7 @@ namespace Components }; Node(); - ~Node(); + void preDestroy() override; static void Add(Network::Address address); static std::vector GetNodes(); @@ -55,5 +55,7 @@ namespace Components static void StoreNodes(bool force); static unsigned short GetPort(); + + static void Migrate(); }; }