feature(nodes): migrate to JSON node list

This commit is contained in:
Diavolo 2023-06-17 11:46:20 +02:00
parent a6864ff725
commit c85538eb23
No known key found for this signature in database
GPG Key ID: FA77F074E98D98A5
3 changed files with 132 additions and 62 deletions

View File

@ -371,7 +371,7 @@ namespace Components
MutedList.access([&](muteList& clients) MutedList.access([&](muteList& clients)
{ {
const nlohmann::json::array_t arr = list; const nlohmann::json::array_t arr = list;
for (auto& entry : arr) for (const auto& entry : arr)
{ {
if (entry.is_number_unsigned()) if (entry.is_number_unsigned())
{ {

View File

@ -48,7 +48,7 @@ namespace Components
this->lastRequest->update(); this->lastRequest->update();
Session::Send(this->address, "nodeListRequest"); Session::Send(this->address, "nodeListRequest");
Node::SendList(this->address); SendList(this->address);
#ifdef NODE_SYSTEM_DEBUG #ifdef NODE_SYSTEM_DEBUG
Logger::Debug("Sent request to {}", this->address.getString()); Logger::Debug("Sent request to {}", this->address.getString());
#endif #endif
@ -56,7 +56,6 @@ namespace Components
void Node::Entry::reset() void Node::Entry::reset()
{ {
// this->lastResponse.reset(); // This would invalidate the node, but maybe we don't want that?
this->lastRequest.reset(); this->lastRequest.reset();
} }
@ -69,28 +68,54 @@ namespace Components
for (auto i = 0; i < list.nodes_size(); ++i) for (auto i = 0; i < list.nodes_size(); ++i)
{ {
const std::string& addr = list.nodes(i); const auto& addr = list.nodes(i);
if (addr.size() == sizeof(sockaddr)) if (addr.size() == sizeof(sockaddr))
{ {
Node::Add(reinterpret_cast<sockaddr*>(const_cast<char*>(addr.data()))); Add(reinterpret_cast<sockaddr*>(const_cast<char*>(addr.data())));
} }
} }
} }
void Node::LoadNodes() void Node::LoadNodes()
{ {
Proto::Node::List list; std::string data;
std::string nodes = Utils::IO::ReadFile("players/nodes.dat"); if (!Utils::IO::ReadFile("players/nodes.json", &data) || data.empty())
if (nodes.empty() || !list.ParseFromString(Utils::Compression::ZLib::Decompress(nodes))) return;
for (int i = 0; i < list.nodes_size(); ++i)
{ {
const std::string& addr = list.nodes(i); return;
}
if (addr.size() == sizeof(sockaddr)) nlohmann::json nodes;
try
{
nodes = nlohmann::json::parse(data);
}
catch (const std::exception& ex)
{
Logger::PrintError(Game::CON_CHANNEL_ERROR, "JSON Parse Error: {}\n", ex.what());
return;
}
if (!nodes.contains("nodes"))
{
Logger::PrintError(Game::CON_CHANNEL_ERROR, "nodes.json contains invalid data\n");
return;
}
const auto& list = nodes["nodes"];
if (!list.is_array())
{
return;
}
const nlohmann::json::array_t arr = list;
Logger::Print("Parsing {} nodes from nodes.json\n", arr.size());
for (const auto& entry : arr)
{
if (entry.is_string())
{ {
Node::Add(reinterpret_cast<sockaddr*>(const_cast<char*>(addr.data()))); Network::Address address(entry.get<std::string>());
Add(address);
} }
} }
} }
@ -99,26 +124,29 @@ namespace Components
{ {
if (Dedicated::IsEnabled() && Dedicated::SVLanOnly.get<bool>()) return; if (Dedicated::IsEnabled() && Dedicated::SVLanOnly.get<bool>()) return;
std::vector<std::string> nodes;
static Utils::Time::Interval interval; static Utils::Time::Interval interval;
if (!force && !interval.elapsed(1min)) return; if (!force && !interval.elapsed(1min)) return;
interval.update(); interval.update();
Proto::Node::List list; Mutex.lock();
Node::Mutex.lock(); for (auto& node : Nodes)
for (auto& node : Node::Nodes)
{ {
if (node.isValid()) if (node.isValid() || force)
{ {
std::string* str = list.add_nodes(); const auto address = node.address.getString();
nodes.emplace_back(address);
sockaddr addr = node.address.getSockAddr();
str->append(reinterpret_cast<char*>(&addr), sizeof(addr));
} }
} }
Node::Mutex.unlock();
Utils::IO::WriteFile("players/nodes.dat", Utils::Compression::ZLib::Compress(list.SerializeAsString())); Mutex.unlock();
nlohmann::json out;
out["nodes"] = nodes;
Utils::IO::WriteFile("players/nodes.json", out.dump());
} }
void Node::Add(Network::Address address) void Node::Add(Network::Address address)
@ -129,23 +157,23 @@ namespace Components
if (!address.isValid()) return; if (!address.isValid()) return;
std::lock_guard _(Node::Mutex); std::lock_guard _(Mutex);
for (auto& session : Node::Nodes) for (auto& session : Nodes)
{ {
if (session.address == address) return; if (session.address == address) return;
} }
Node::Entry node; Entry node;
node.address = address; node.address = address;
Node::Nodes.push_back(node); Nodes.push_back(node);
} }
std::vector<Node::Entry> Node::GetNodes() std::vector<Node::Entry> Node::GetNodes()
{ {
std::lock_guard _(Node::Mutex); std::lock_guard _(Mutex);
return Node::Nodes; return Nodes;
} }
void Node::RunFrame() void Node::RunFrame()
@ -165,7 +193,7 @@ namespace Components
if (WasIngame) // our last frame we were in-game and now we aren't so touch all nodes if (WasIngame) // our last frame we were in-game and now we aren't so touch all nodes
{ {
for (auto i = Node::Nodes.begin(); i != Node::Nodes.end();++i) for (auto i = Nodes.begin(); i != Nodes.end();++i)
{ {
// clearing the last request and response times makes the // clearing the last request and response times makes the
// dispatcher think its a new node and will force a refresh // dispatcher think its a new node and will force a refresh
@ -181,15 +209,15 @@ namespace Components
if (!frameLimit.elapsed(std::chrono::milliseconds(interval))) return; if (!frameLimit.elapsed(std::chrono::milliseconds(interval))) return;
frameLimit.update(); frameLimit.update();
std::lock_guard _(Node::Mutex); std::lock_guard _(Mutex);
Dvar::Var queryLimit("net_serverQueryLimit"); Dvar::Var queryLimit("net_serverQueryLimit");
int sentRequests = 0; int sentRequests = 0;
for (auto i = Node::Nodes.begin(); i != Node::Nodes.end();) for (auto i = Nodes.begin(); i != Nodes.end();)
{ {
if (i->isDead()) if (i->isDead())
{ {
i = Node::Nodes.erase(i); i = Nodes.erase(i);
continue; continue;
} }
if (sentRequests < queryLimit.get<int>() && i->requiresRequest()) if (sentRequests < queryLimit.get<int>() && i->requiresRequest())
@ -204,10 +232,9 @@ namespace Components
void Node::Synchronize() void Node::Synchronize()
{ {
std::lock_guard _(Node::Mutex); std::lock_guard _(Mutex);
for (auto& node : Node::Nodes) for (auto& node : Nodes)
{ {
//if (node.isValid()) // Comment out to simulate 'syncnodes' behaviour
{ {
node.reset(); node.reset();
} }
@ -223,7 +250,7 @@ namespace Components
Logger::Debug("Received response from {}", address.getString()); Logger::Debug("Received response from {}", address.getString());
#endif #endif
std::lock_guard _(Node::Mutex); std::lock_guard _(Mutex);
for (int i = 0; i < list.nodes_size(); ++i) for (int i = 0; i < list.nodes_size(); ++i)
{ {
@ -231,7 +258,7 @@ namespace Components
if (addr.size() == sizeof(sockaddr)) if (addr.size() == sizeof(sockaddr))
{ {
Node::Add(reinterpret_cast<sockaddr*>(const_cast<char*>(addr.data()))); Add(reinterpret_cast<sockaddr*>(const_cast<char*>(addr.data())));
} }
} }
@ -251,7 +278,7 @@ namespace Components
#endif #endif
} }
for (auto& node : Node::Nodes) for (auto& node : Nodes)
{ {
if (address == node.address) if (address == node.address)
{ {
@ -263,35 +290,37 @@ namespace Components
} }
} }
Node::Entry entry; Entry entry;
entry.address = address; entry.address = address;
entry.data.protocol = list.protocol(); entry.data.protocol = list.protocol();
entry.lastResponse.emplace(Utils::Time::Point()); entry.lastResponse.emplace(Utils::Time::Point());
Node::Nodes.push_back(entry); Nodes.push_back(entry);
} }
} }
void Node::SendList(const Network::Address& address) void Node::SendList(const Network::Address& address)
{ {
std::lock_guard _(Node::Mutex); std::lock_guard _(Mutex);
// need to keep the message size below 1404 bytes else recipient will just drop it // need to keep the message size below 1404 bytes else recipient will just drop it
std::vector<std::string> nodeListReponseMessages; std::vector<std::string> nodeListReponseMessages;
for (std::size_t curNode = 0; curNode < Node::Nodes.size();) for (std::size_t curNode = 0; curNode < Nodes.size();)
{ {
Proto::Node::List list; Proto::Node::List list;
list.set_isnode(Dedicated::IsEnabled()); list.set_isnode(Dedicated::IsEnabled());
list.set_protocol(PROTOCOL); list.set_protocol(PROTOCOL);
list.set_port(Node::GetPort()); list.set_port(GetPort());
for (std::size_t i = 0; i < NODE_MAX_NODES_TO_SEND;) for (std::size_t i = 0; i < NODE_MAX_NODES_TO_SEND;)
{ {
if (curNode >= Node::Nodes.size()) if (curNode >= Nodes.size())
{
break; break;
}
auto node = Node::Nodes.at(curNode++); auto node = Nodes.at(curNode++);
if (node.isValid()) if (node.isValid())
{ {
@ -326,6 +355,44 @@ namespace Components
return Network::GetPort(); return Network::GetPort();
} }
void Node::Migrate()
{
Proto::Node::List list;
std::string nodes;
if (!Utils::IO::ReadFile("players/nodes.dat", &nodes) || nodes.empty())
{
return;
}
if (!list.ParseFromString(Utils::Compression::ZLib::Decompress(nodes)))
{
return;
}
std::vector<std::string> data;
for (auto i = 0; i < list.nodes_size(); ++i)
{
const std::string& addr = list.nodes(i);
if (addr.size() == sizeof(sockaddr))
{
Network::Address address(reinterpret_cast<sockaddr*>(const_cast<char*>(addr.data())));
data.emplace_back(address.getString());
}
}
nlohmann::json out;
out["nodes"] = data;
if (!Utils::IO::FileExists("players/nodes.json"))
{
Utils::IO::WriteFile("players/nodes.json", out.dump());
}
Utils::IO::RemoveFile("players/nodes.dat");
}
Node::Node() Node::Node()
{ {
if (ZoneBuilder::IsEnabled()) return; if (ZoneBuilder::IsEnabled()) return;
@ -333,32 +400,33 @@ namespace Components
Scheduler::Loop([] Scheduler::Loop([]
{ {
Node::StoreNodes(false); StoreNodes(false);
}, Scheduler::Pipeline::ASYNC); }, Scheduler::Pipeline::ASYNC);
Scheduler::Loop(Node::RunFrame, Scheduler::Pipeline::MAIN); Scheduler::Loop(RunFrame, Scheduler::Pipeline::MAIN);
Session::Handle("nodeListResponse", Node::HandleResponse); Session::Handle("nodeListResponse", HandleResponse);
Session::Handle("nodeListRequest", [](const Network::Address& address, [[maybe_unused]] const std::string& data) Session::Handle("nodeListRequest", [](const Network::Address& address, [[maybe_unused]] const std::string& data)
{ {
Node::SendList(address); SendList(address);
}); });
// Load stored nodes // Load stored nodes
auto loadNodes = [] auto loadNodes = []
{ {
Node::LoadNodePreset(); Migrate();
Node::LoadNodes(); LoadNodePreset();
LoadNodes();
}; };
Scheduler::OnGameInitialized(loadNodes, Scheduler::Pipeline::MAIN); Scheduler::OnGameInitialized(loadNodes, Scheduler::Pipeline::MAIN);
Command::Add("listNodes", [](const Command::Params*) Command::Add("listNodes", [](const Command::Params*)
{ {
Logger::Print("Nodes: {}\n", Node::Nodes.size()); Logger::Print("Nodes: {}\n", Nodes.size());
std::lock_guard _(Node::Mutex); std::lock_guard _(Mutex);
for (const auto& node : Node::Nodes) for (const auto& node : Nodes)
{ {
Logger::Print("{}\t({})\n", node.address.getString(), node.isValid() ? "Valid" : "Invalid"); Logger::Print("{}\t({})\n", node.address.getString(), node.isValid() ? "Valid" : "Invalid");
} }
@ -370,15 +438,15 @@ namespace Components
auto address = Network::Address{ params->get(1) }; auto address = Network::Address{ params->get(1) };
if (address.isValid()) if (address.isValid())
{ {
Node::Add(address); Add(address);
} }
}); });
} }
Node::~Node() void Node::preDestroy()
{ {
std::lock_guard _(Node::Mutex); std::lock_guard _(Mutex);
Node::StoreNodes(true); StoreNodes(true);
Node::Nodes.clear(); Nodes.clear();
} }
} }

View File

@ -34,7 +34,7 @@ namespace Components
}; };
Node(); Node();
~Node(); void preDestroy() override;
static void Add(Network::Address address); static void Add(Network::Address address);
static std::vector<Entry> GetNodes(); static std::vector<Entry> GetNodes();
@ -55,5 +55,7 @@ namespace Components
static void StoreNodes(bool force); static void StoreNodes(bool force);
static unsigned short GetPort(); static unsigned short GetPort();
static void Migrate();
}; };
} }