Playlist stuff.

This commit is contained in:
momo5502 2016-01-09 15:30:13 +01:00
parent 5624390e67
commit 8343ce5f13
6 changed files with 141 additions and 54 deletions

View File

@ -131,20 +131,23 @@ namespace Components
void Download::PacketResponse(Network::Address target, std::string data)
{
//Logger::Print("Packet incoming!\n");
if (data.size() < sizeof(Download::Container::Packet)) return; // Drop invalid packets, if they were important, we'll re-request them later
Download::Container::Packet* packet = (Download::Container::Packet*)data.data();
//Logger::Print("Reading data!\n");
if (data.size() < (sizeof(Download::Container::Packet) + packet->length)) return; // Again, drop invalid packets
//Logger::Print("Finding corresponding download!\n");
auto download = Download::FindClientDownload(packet->id);
if (download && download->target == target)
{
//Logger::Print("Parsing packet!\n");
download->lastPing = Game::Com_Milliseconds();
std::string packetData(data.data() + sizeof(Download::Container::Packet), packet->length);
if (packet->hash == Utils::OneAtATime(packetData.data(), packetData.size()))
{
//Logger::Print("Packet added!\n");
download->parts[packet->partId] = packetData;
if (Download::HasReceivedAllPackets(download))
@ -153,6 +156,10 @@ namespace Components
Download::RemoveClientDownload(download->id);
}
}
else
{
Logger::Print("Hash invalid!\n");
}
}
}
@ -176,6 +183,7 @@ namespace Components
{
download->lastPing = Game::Com_Milliseconds();
download->acknowledged = true;
Logger::Print("Client acknowledged!\n");
}
}
}
@ -208,10 +216,9 @@ namespace Components
download.lastPing = Game::Com_Milliseconds();
download.maxParts = 0;
// Generate random 40kb buffer
for (int i = 0; i < 10000; i++)
for (int i = 0; i < 1000000; i++)
{
download.buffer.append(Utils::VA("%i", i));
download.buffer.append("1234567890");
}
download.maxParts = download.buffer.size() / PACKET_SIZE;
@ -251,6 +258,8 @@ namespace Components
{
if (packets.size())
{
download->lastPing = Game::Com_Milliseconds();
std::string data = "dlMissRequest\n";
data.append(reinterpret_cast<char*>(&download->id), sizeof(int));
@ -280,7 +289,7 @@ namespace Components
void Download::SendPacket(Download::Container::DownloadSV* download, int packet)
{
if (!download || packet < download->maxParts) return;
if (!download || packet >= download->maxParts) return;
download->lastPing = Game::Com_Milliseconds();
download->sentParts.push_back(packet);
@ -316,7 +325,7 @@ namespace Components
}
// Request missing parts
if ((Game::Com_Milliseconds() - i->lastPing) > DOWNLOAD_TIMEOUT)
if (i->acknowledged && (Game::Com_Milliseconds() - i->lastPing) > DOWNLOAD_TIMEOUT)
{
std::vector<int> missingPackets;
for (int j = 0; j < i->maxParts; j++)
@ -326,6 +335,8 @@ namespace Components
missingPackets.push_back(j);
}
}
Download::RequestMissingPackets(&*i, missingPackets);
}
}
}
@ -341,10 +352,11 @@ namespace Components
}
int packets = 0;
for (int j = 0; j < i->maxParts && packets <= FRAME_PACKET_LIMIT; j++)
for (int j = 0; j < i->maxParts && packets <= FRAME_PACKET_LIMIT && i->acknowledged; j++)
{
if (!Download::HasSentPacket(&*i, j))
{
//Logger::Print("Sending packet...\n");
Download::SendPacket(&*i, j);
packets++;
}
@ -406,7 +418,7 @@ namespace Components
Logger::Print("Requesting!\n");
Download::Get(Network::Address("192.168.0.23:28960"), "test", [] (int id, std::string data)
{
Logger::Print("Download succeeded!\n");
Logger::Print("Download succeeded %d!\n", Game::Com_Milliseconds() - (Download::FindClientDownload(id)->startTime));
}, [] (int id)
{
Logger::Print("Download failed!\n");

View File

@ -1,4 +1,4 @@
#define FRAME_PACKET_LIMIT 10
#define FRAME_PACKET_LIMIT 20
#define DOWNLOAD_TIMEOUT 2500
#define PACKET_SIZE 1000
@ -7,13 +7,6 @@ namespace Components
class Download : public Component
{
public:
Download();
~Download();
const char* GetName() { return "Download"; };
static int Get(Network::Address target, std::string file, std::function<void(int, std::string)> successCallback, std::function<void(int)> failureCallback);
private:
struct Container
{
struct DownloadCL
@ -61,6 +54,16 @@ namespace Components
std::vector<DownloadSV> ServerDownloads;
};
Download();
~Download();
const char* GetName() { return "Download"; };
static int Get(Network::Address target, std::string file, std::function<void(int, std::string)> successCallback, std::function<void(int)> failureCallback);
static Container::DownloadCL* FindClientDownload(int id);
static Container::DownloadSV* FindServerDownload(int id);
private:
static void Frame();
static Container DataContainer;
@ -74,9 +77,6 @@ namespace Components
static void DownloadRequest(Network::Address target, std::string data);
// Helper functions
static Container::DownloadCL* FindClientDownload(int id);
static Container::DownloadSV* FindServerDownload(int id);
static void RemoveClientDownload(int id);
static void RemoveServerDownload(int id);

View File

@ -55,6 +55,7 @@ namespace Components
static void BroadcastAll(std::string data);
private:
static SOCKET TcpSocket;
static std::string SelectedPacket;
static std::map<std::string, Callback> PacketHandlers;
static int PacketInterceptionHandler(const char* packet);

View File

@ -45,23 +45,34 @@ namespace Components
{
Logger::Print("Received playlist request, sending currently stored buffer.\n");
// Split playlist data
unsigned int maxPacketSize = 1000;
unsigned int maxBytes = Playlist::CurrentPlaylistBuffer.size();
// // Split playlist data
// unsigned int maxPacketSize = 1000;
// unsigned int maxBytes = Playlist::CurrentPlaylistBuffer.size();
//
// for (unsigned int i = 0; i < maxBytes; i += maxPacketSize)
// {
// unsigned int sendBytes = min(maxPacketSize, maxBytes - i);
// unsigned int sentBytes = i + sendBytes;
//
// std::string data;
// data.append(reinterpret_cast<char*>(&sentBytes), 4); // Sent bytes
// data.append(reinterpret_cast<char*>(&maxBytes), 4); // Max bytes
//
// data.append(Playlist::CurrentPlaylistBuffer.data() + i, sendBytes);
//
// Network::SendRaw(address, std::string("playlistresponse\n") + data);
// }
for (unsigned int i = 0; i < maxBytes; i += maxPacketSize)
{
unsigned int sendBytes = min(maxPacketSize, maxBytes - i);
unsigned int sentBytes = i + sendBytes;
std::string compressedList = Utils::Compression::ZLib::Compress(Playlist::CurrentPlaylistBuffer);
unsigned int size = compressedList.size();
unsigned int hash = Utils::OneAtATime(compressedList.data(), compressedList.size());
std::string data;
data.append(reinterpret_cast<char*>(&sentBytes), 4); // Sent bytes
data.append(reinterpret_cast<char*>(&maxBytes), 4); // Max bytes
std::string response = "playlistresponse\n";
response.append(reinterpret_cast<char*>(&hash), 4);
response.append(reinterpret_cast<char*>(&size), 4);
response.append(compressedList);
data.append(Playlist::CurrentPlaylistBuffer.data() + i, sendBytes);
Network::SendRaw(address, std::string("playlistresponse\n") + data);
}
Network::SendRaw(address, response);
}
void Playlist::PlaylistReponse(Network::Address address, std::string data)
@ -78,34 +89,59 @@ namespace Components
}
else
{
unsigned int sentBytes = *(unsigned int*)(data.data() + 0);
unsigned int maxBytes = *(unsigned int*)(data.data() + 4);
// unsigned int sentBytes = *(unsigned int*)(data.data() + 0);
// unsigned int maxBytes = *(unsigned int*)(data.data() + 4);
//
// // Clear current buffer, if we receive a new packet
// if (data.size() - 8 == sentBytes) Playlist::ReceivedPlaylistBuffer.clear();
//
// // Append received data
// Playlist::ReceivedPlaylistBuffer.append(data.data() + 8, data.size() - 8);
//
// if (Playlist::ReceivedPlaylistBuffer.size() != sentBytes)
// {
// Party::PlaylistError(Utils::VA("Received playlist data, but it seems invalid: %d != %d", sentBytes, Playlist::ReceivedPlaylistBuffer.size()));
// Playlist::ReceivedPlaylistBuffer.clear();
// return;
// }
// else
// {
// Logger::Print("Received playlist data: %d/%d (%d%%)\n", sentBytes, maxBytes, ((100 * sentBytes) / maxBytes));
// }
//
// if (Playlist::ReceivedPlaylistBuffer.size() == maxBytes)
// {
// Logger::Print("Received playlist, loading and continuing connection...\n");
// Game::Live_ParsePlaylists(Playlist::ReceivedPlaylistBuffer.data());
// Party::PlaylistContinue();
//
// Playlist::ReceivedPlaylistBuffer.clear();
// }
// Clear current buffer, if we receive a new packet
if (data.size() - 8 == sentBytes) Playlist::ReceivedPlaylistBuffer.clear();
unsigned int hash = *(unsigned int*)data.data();
unsigned int length = *(unsigned int*)(data.data() + 4);
// Append received data
Playlist::ReceivedPlaylistBuffer.append(data.data() + 8, data.size() - 8);
if (Playlist::ReceivedPlaylistBuffer.size() != sentBytes)
if (length > (data.size() - 8))
{
Party::PlaylistError(Utils::VA("Received playlist data, but it seems invalid: %d != %d", sentBytes, Playlist::ReceivedPlaylistBuffer.size()));
Party::PlaylistError(Utils::VA("Received playlist response, but it is too short."));
Playlist::ReceivedPlaylistBuffer.clear();
return;
}
else
unsigned int hash2 = Utils::OneAtATime(data.data() + 8, length);
std::string compressedData(data.data() + 8, length);
Playlist::ReceivedPlaylistBuffer = Utils::Compression::ZLib::Decompress(compressedData);
if (hash2 != hash)
{
Logger::Print("Received playlist data: %d/%d (%d%%)\n", sentBytes, maxBytes, ((100 * sentBytes) / maxBytes));
Party::PlaylistError(Utils::VA("Received playlist response, but the checksum did not match (%d != %d).", hash, hash2));
Playlist::ReceivedPlaylistBuffer.clear();
return;
}
if (Playlist::ReceivedPlaylistBuffer.size() == maxBytes)
{
Logger::Print("Received playlist, loading and continuing connection...\n");
Game::Live_ParsePlaylists(Playlist::ReceivedPlaylistBuffer.data());
Party::PlaylistContinue();
Playlist::ReceivedPlaylistBuffer.clear();
}
}
}
else

View File

@ -44,8 +44,44 @@ namespace Utils
std::string ZLib::Decompress(std::string data)
{
//#error "Not implemented yet!"
return data;
z_stream stream;
ZeroMemory(&stream, sizeof(stream));
std::string buffer;
if (inflateInit(&stream) != Z_OK)
{
return buffer;
}
int ret = 0;
uint8_t dest[CHUNK] = { 0 };
const char* dataPtr = data.data();
do
{
stream.avail_in = min(CHUNK, data.size() - (dataPtr - data.data()));
stream.next_in = reinterpret_cast<const uint8_t*>(dataPtr);
do
{
stream.avail_out = CHUNK;
stream.next_out = dest;
ret = inflate(&stream, Z_NO_FLUSH);
if (ret == Z_STREAM_ERROR)
{
inflateEnd(&stream);
}
buffer.append(reinterpret_cast<const char*>(dest), CHUNK - stream.avail_out);
} while (stream.avail_out == 0);
} while (ret != Z_STREAM_END);
inflateEnd(&stream);
return buffer;
}
};
}

View File

@ -1,3 +1,5 @@
#define CHUNK 16384
namespace Utils
{
namespace Compression