From 8343ce5f13ddc2833d19754e039c69ce197f8191 Mon Sep 17 00:00:00 2001 From: momo5502 Date: Sat, 9 Jan 2016 15:30:13 +0100 Subject: [PATCH] Playlist stuff. --- src/Components/Modules/Download.cpp | 30 ++++++--- src/Components/Modules/Download.hpp | 22 +++--- src/Components/Modules/Network.hpp | 1 + src/Components/Modules/Playlist.cpp | 100 +++++++++++++++++++--------- src/Utils/Compression.cpp | 40 ++++++++++- src/Utils/Compression.hpp | 2 + 6 files changed, 141 insertions(+), 54 deletions(-) diff --git a/src/Components/Modules/Download.cpp b/src/Components/Modules/Download.cpp index 72af0bda..693cc1da 100644 --- a/src/Components/Modules/Download.cpp +++ b/src/Components/Modules/Download.cpp @@ -131,20 +131,23 @@ namespace Components void Download::PacketResponse(Network::Address target, std::string data) { + //Logger::Print("Packet incoming!\n"); if (data.size() < sizeof(Download::Container::Packet)) return; // Drop invalid packets, if they were important, we'll re-request them later Download::Container::Packet* packet = (Download::Container::Packet*)data.data(); - + //Logger::Print("Reading data!\n"); if (data.size() < (sizeof(Download::Container::Packet) + packet->length)) return; // Again, drop invalid packets - + //Logger::Print("Finding corresponding download!\n"); auto download = Download::FindClientDownload(packet->id); if (download && download->target == target) { + //Logger::Print("Parsing packet!\n"); download->lastPing = Game::Com_Milliseconds(); std::string packetData(data.data() + sizeof(Download::Container::Packet), packet->length); if (packet->hash == Utils::OneAtATime(packetData.data(), packetData.size())) { + //Logger::Print("Packet added!\n"); download->parts[packet->partId] = packetData; if (Download::HasReceivedAllPackets(download)) @@ -153,6 +156,10 @@ namespace Components Download::RemoveClientDownload(download->id); } } + else + { + Logger::Print("Hash invalid!\n"); + } } } @@ -176,6 +183,7 @@ namespace Components { download->lastPing = Game::Com_Milliseconds(); download->acknowledged = true; + Logger::Print("Client acknowledged!\n"); } } } @@ -208,10 +216,9 @@ namespace Components download.lastPing = Game::Com_Milliseconds(); download.maxParts = 0; - // Generate random 40kb buffer - for (int i = 0; i < 10000; i++) + for (int i = 0; i < 1000000; i++) { - download.buffer.append(Utils::VA("%i", i)); + download.buffer.append("1234567890"); } download.maxParts = download.buffer.size() / PACKET_SIZE; @@ -251,6 +258,8 @@ namespace Components { if (packets.size()) { + download->lastPing = Game::Com_Milliseconds(); + std::string data = "dlMissRequest\n"; data.append(reinterpret_cast(&download->id), sizeof(int)); @@ -280,7 +289,7 @@ namespace Components void Download::SendPacket(Download::Container::DownloadSV* download, int packet) { - if (!download || packet < download->maxParts) return; + if (!download || packet >= download->maxParts) return; download->lastPing = Game::Com_Milliseconds(); download->sentParts.push_back(packet); @@ -316,7 +325,7 @@ namespace Components } // Request missing parts - if ((Game::Com_Milliseconds() - i->lastPing) > DOWNLOAD_TIMEOUT) + if (i->acknowledged && (Game::Com_Milliseconds() - i->lastPing) > DOWNLOAD_TIMEOUT) { std::vector missingPackets; for (int j = 0; j < i->maxParts; j++) @@ -326,6 +335,8 @@ namespace Components missingPackets.push_back(j); } } + + Download::RequestMissingPackets(&*i, missingPackets); } } } @@ -341,10 +352,11 @@ namespace Components } int packets = 0; - for (int j = 0; j < i->maxParts && packets <= FRAME_PACKET_LIMIT; j++) + for (int j = 0; j < i->maxParts && packets <= FRAME_PACKET_LIMIT && i->acknowledged; j++) { if (!Download::HasSentPacket(&*i, j)) { + //Logger::Print("Sending packet...\n"); Download::SendPacket(&*i, j); packets++; } @@ -406,7 +418,7 @@ namespace Components Logger::Print("Requesting!\n"); Download::Get(Network::Address("192.168.0.23:28960"), "test", [] (int id, std::string data) { - Logger::Print("Download succeeded!\n"); + Logger::Print("Download succeeded %d!\n", Game::Com_Milliseconds() - (Download::FindClientDownload(id)->startTime)); }, [] (int id) { Logger::Print("Download failed!\n"); diff --git a/src/Components/Modules/Download.hpp b/src/Components/Modules/Download.hpp index dfda0dcf..a7ec12a2 100644 --- a/src/Components/Modules/Download.hpp +++ b/src/Components/Modules/Download.hpp @@ -1,4 +1,4 @@ -#define FRAME_PACKET_LIMIT 10 +#define FRAME_PACKET_LIMIT 20 #define DOWNLOAD_TIMEOUT 2500 #define PACKET_SIZE 1000 @@ -7,13 +7,6 @@ namespace Components class Download : public Component { public: - Download(); - ~Download(); - const char* GetName() { return "Download"; }; - - static int Get(Network::Address target, std::string file, std::function successCallback, std::function failureCallback); - - private: struct Container { struct DownloadCL @@ -61,6 +54,16 @@ namespace Components std::vector ServerDownloads; }; + Download(); + ~Download(); + const char* GetName() { return "Download"; }; + + static int Get(Network::Address target, std::string file, std::function successCallback, std::function failureCallback); + + static Container::DownloadCL* FindClientDownload(int id); + static Container::DownloadSV* FindServerDownload(int id); + + private: static void Frame(); static Container DataContainer; @@ -74,9 +77,6 @@ namespace Components static void DownloadRequest(Network::Address target, std::string data); // Helper functions - static Container::DownloadCL* FindClientDownload(int id); - static Container::DownloadSV* FindServerDownload(int id); - static void RemoveClientDownload(int id); static void RemoveServerDownload(int id); diff --git a/src/Components/Modules/Network.hpp b/src/Components/Modules/Network.hpp index 6aaeb531..b3fb7da0 100644 --- a/src/Components/Modules/Network.hpp +++ b/src/Components/Modules/Network.hpp @@ -55,6 +55,7 @@ namespace Components static void BroadcastAll(std::string data); private: + static SOCKET TcpSocket; static std::string SelectedPacket; static std::map PacketHandlers; static int PacketInterceptionHandler(const char* packet); diff --git a/src/Components/Modules/Playlist.cpp b/src/Components/Modules/Playlist.cpp index 6bd195f8..4585ebdb 100644 --- a/src/Components/Modules/Playlist.cpp +++ b/src/Components/Modules/Playlist.cpp @@ -45,23 +45,34 @@ namespace Components { Logger::Print("Received playlist request, sending currently stored buffer.\n"); - // Split playlist data - unsigned int maxPacketSize = 1000; - unsigned int maxBytes = Playlist::CurrentPlaylistBuffer.size(); +// // Split playlist data +// unsigned int maxPacketSize = 1000; +// unsigned int maxBytes = Playlist::CurrentPlaylistBuffer.size(); +// +// for (unsigned int i = 0; i < maxBytes; i += maxPacketSize) +// { +// unsigned int sendBytes = min(maxPacketSize, maxBytes - i); +// unsigned int sentBytes = i + sendBytes; +// +// std::string data; +// data.append(reinterpret_cast(&sentBytes), 4); // Sent bytes +// data.append(reinterpret_cast(&maxBytes), 4); // Max bytes +// +// data.append(Playlist::CurrentPlaylistBuffer.data() + i, sendBytes); +// +// Network::SendRaw(address, std::string("playlistresponse\n") + data); +// } - for (unsigned int i = 0; i < maxBytes; i += maxPacketSize) - { - unsigned int sendBytes = min(maxPacketSize, maxBytes - i); - unsigned int sentBytes = i + sendBytes; + std::string compressedList = Utils::Compression::ZLib::Compress(Playlist::CurrentPlaylistBuffer); + unsigned int size = compressedList.size(); + unsigned int hash = Utils::OneAtATime(compressedList.data(), compressedList.size()); - std::string data; - data.append(reinterpret_cast(&sentBytes), 4); // Sent bytes - data.append(reinterpret_cast(&maxBytes), 4); // Max bytes + std::string response = "playlistresponse\n"; + response.append(reinterpret_cast(&hash), 4); + response.append(reinterpret_cast(&size), 4); + response.append(compressedList); - data.append(Playlist::CurrentPlaylistBuffer.data() + i, sendBytes); - - Network::SendRaw(address, std::string("playlistresponse\n") + data); - } + Network::SendRaw(address, response); } void Playlist::PlaylistReponse(Network::Address address, std::string data) @@ -78,34 +89,59 @@ namespace Components } else { - unsigned int sentBytes = *(unsigned int*)(data.data() + 0); - unsigned int maxBytes = *(unsigned int*)(data.data() + 4); +// unsigned int sentBytes = *(unsigned int*)(data.data() + 0); +// unsigned int maxBytes = *(unsigned int*)(data.data() + 4); +// +// // Clear current buffer, if we receive a new packet +// if (data.size() - 8 == sentBytes) Playlist::ReceivedPlaylistBuffer.clear(); +// +// // Append received data +// Playlist::ReceivedPlaylistBuffer.append(data.data() + 8, data.size() - 8); +// +// if (Playlist::ReceivedPlaylistBuffer.size() != sentBytes) +// { +// Party::PlaylistError(Utils::VA("Received playlist data, but it seems invalid: %d != %d", sentBytes, Playlist::ReceivedPlaylistBuffer.size())); +// Playlist::ReceivedPlaylistBuffer.clear(); +// return; +// } +// else +// { +// Logger::Print("Received playlist data: %d/%d (%d%%)\n", sentBytes, maxBytes, ((100 * sentBytes) / maxBytes)); +// } +// +// if (Playlist::ReceivedPlaylistBuffer.size() == maxBytes) +// { +// Logger::Print("Received playlist, loading and continuing connection...\n"); +// Game::Live_ParsePlaylists(Playlist::ReceivedPlaylistBuffer.data()); +// Party::PlaylistContinue(); +// +// Playlist::ReceivedPlaylistBuffer.clear(); +// } - // Clear current buffer, if we receive a new packet - if (data.size() - 8 == sentBytes) Playlist::ReceivedPlaylistBuffer.clear(); + unsigned int hash = *(unsigned int*)data.data(); + unsigned int length = *(unsigned int*)(data.data() + 4); - // Append received data - Playlist::ReceivedPlaylistBuffer.append(data.data() + 8, data.size() - 8); - - if (Playlist::ReceivedPlaylistBuffer.size() != sentBytes) + if (length > (data.size() - 8)) { - Party::PlaylistError(Utils::VA("Received playlist data, but it seems invalid: %d != %d", sentBytes, Playlist::ReceivedPlaylistBuffer.size())); + Party::PlaylistError(Utils::VA("Received playlist response, but it is too short.")); Playlist::ReceivedPlaylistBuffer.clear(); return; } - else - { - Logger::Print("Received playlist data: %d/%d (%d%%)\n", sentBytes, maxBytes, ((100 * sentBytes) / maxBytes)); - } - if (Playlist::ReceivedPlaylistBuffer.size() == maxBytes) - { - Logger::Print("Received playlist, loading and continuing connection...\n"); - Game::Live_ParsePlaylists(Playlist::ReceivedPlaylistBuffer.data()); - Party::PlaylistContinue(); + unsigned int hash2 = Utils::OneAtATime(data.data() + 8, length); + std::string compressedData(data.data() + 8, length); + Playlist::ReceivedPlaylistBuffer = Utils::Compression::ZLib::Decompress(compressedData); + if (hash2 != hash) + { + Party::PlaylistError(Utils::VA("Received playlist response, but the checksum did not match (%d != %d).", hash, hash2)); Playlist::ReceivedPlaylistBuffer.clear(); + return; } + + Logger::Print("Received playlist, loading and continuing connection...\n"); + Game::Live_ParsePlaylists(Playlist::ReceivedPlaylistBuffer.data()); + Party::PlaylistContinue(); } } else diff --git a/src/Utils/Compression.cpp b/src/Utils/Compression.cpp index 9b69f99b..e64e9442 100644 --- a/src/Utils/Compression.cpp +++ b/src/Utils/Compression.cpp @@ -44,8 +44,44 @@ namespace Utils std::string ZLib::Decompress(std::string data) { - //#error "Not implemented yet!" - return data; + z_stream stream; + ZeroMemory(&stream, sizeof(stream)); + std::string buffer; + + if (inflateInit(&stream) != Z_OK) + { + return buffer; + } + + int ret = 0; + uint8_t dest[CHUNK] = { 0 }; + const char* dataPtr = data.data(); + + do + { + stream.avail_in = min(CHUNK, data.size() - (dataPtr - data.data())); + stream.next_in = reinterpret_cast(dataPtr); + + do + { + stream.avail_out = CHUNK; + stream.next_out = dest; + + ret = inflate(&stream, Z_NO_FLUSH); + if (ret == Z_STREAM_ERROR) + { + inflateEnd(&stream); + } + + buffer.append(reinterpret_cast(dest), CHUNK - stream.avail_out); + + } while (stream.avail_out == 0); + + } while (ret != Z_STREAM_END); + + inflateEnd(&stream); + + return buffer; } }; } diff --git a/src/Utils/Compression.hpp b/src/Utils/Compression.hpp index 2679f617..2c826294 100644 --- a/src/Utils/Compression.hpp +++ b/src/Utils/Compression.hpp @@ -1,3 +1,5 @@ +#define CHUNK 16384 + namespace Utils { namespace Compression