cleanup and simplify the CoD RCon implementation

This commit is contained in:
RaidMax 2022-10-23 14:03:57 -05:00
parent f4e7d5daf9
commit c06b0982a7
4 changed files with 199 additions and 415 deletions

View File

@ -229,14 +229,13 @@ namespace IW4MAdmin.Application
{ {
// store the server hash code and task for it // store the server hash code and task for it
var runningUpdateTasks = new Dictionary<long, (Task task, CancellationTokenSource tokenSource, DateTime startTime)>(); var runningUpdateTasks = new Dictionary<long, (Task task, CancellationTokenSource tokenSource, DateTime startTime)>();
var timeout = TimeSpan.FromSeconds(60);
while (!_tokenSource.IsCancellationRequested) while (!_tokenSource.IsCancellationRequested) // main shutdown requested
{ {
// select the server ids that have completed the update task // select the server ids that have completed the update task
var serverTasksToRemove = runningUpdateTasks var serverTasksToRemove = runningUpdateTasks
.Where(ut => ut.Value.task.Status == TaskStatus.RanToCompletion || .Where(ut => ut.Value.task.IsCompleted)
ut.Value.task.Status == TaskStatus.Canceled || // we want to cancel if a task takes longer than 5 minutes
ut.Value.task.Status == TaskStatus.Faulted || DateTime.Now - ut.Value.startTime > TimeSpan.FromMinutes(5))
.Select(ut => ut.Key) .Select(ut => ut.Key)
.ToList(); .ToList();
@ -252,36 +251,16 @@ namespace IW4MAdmin.Application
} }
// select the servers where the tasks have completed // select the servers where the tasks have completed
var serverIds = Servers.Select(s => s.EndPoint).Except(runningUpdateTasks.Select(r => r.Key)).ToList(); var newTaskServers = Servers.Select(s => s.EndPoint).Except(runningUpdateTasks.Select(r => r.Key)).ToList();
foreach (var server in Servers.Where(s => serverIds.Contains(s.EndPoint)))
foreach (var server in Servers.Where(s => newTaskServers.Contains(s.EndPoint)))
{ {
var tokenSource = new CancellationTokenSource(); var firstTokenSource = new CancellationTokenSource();
runningUpdateTasks.Add(server.EndPoint, (Task.Run(async () => firstTokenSource.CancelAfter(timeout);
{ var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(firstTokenSource.Token, _tokenSource.Token);
try runningUpdateTasks.Add(server.EndPoint, (ProcessUpdateHandler(server, linkedTokenSource.Token), linkedTokenSource, DateTime.Now));
{
if (runningUpdateTasks.ContainsKey(server.EndPoint))
{
await server.ProcessUpdatesAsync(_tokenSource.Token)
.WithWaitCancellation(runningUpdateTasks[server.EndPoint].tokenSource.Token);
}
}
catch (Exception e)
{
using (LogContext.PushProperty("Server", server.ToString()))
{
_logger.LogError(e, "Failed to update status");
}
}
finally
{
server.IsInitialized = true;
}
}, tokenSource.Token), tokenSource, DateTime.Now));
} }
try try
{ {
await Task.Delay(ConfigHandler.Configuration().RConPollRate, _tokenSource.Token); await Task.Delay(ConfigHandler.Configuration().RConPollRate, _tokenSource.Token);
@ -289,7 +268,7 @@ namespace IW4MAdmin.Application
// if a cancellation is received, we want to return immediately after shutting down // if a cancellation is received, we want to return immediately after shutting down
catch catch
{ {
foreach (var server in Servers.Where(s => serverIds.Contains(s.EndPoint))) foreach (var server in Servers.Where(s => newTaskServers.Contains(s.EndPoint)))
{ {
await server.ProcessUpdatesAsync(_tokenSource.Token); await server.ProcessUpdatesAsync(_tokenSource.Token);
} }
@ -298,6 +277,25 @@ namespace IW4MAdmin.Application
} }
} }
private async Task ProcessUpdateHandler(Server server, CancellationToken token)
{
try
{
await server.ProcessUpdatesAsync(token);
}
catch (Exception ex)
{
using (LogContext.PushProperty("Server", server.ToString()))
{
_logger.LogError(ex, "Failed to update status");
}
}
finally
{
server.IsInitialized = true;
}
}
public async Task Init() public async Task Init()
{ {
IsRunning = true; IsRunning = true;
@ -617,6 +615,7 @@ namespace IW4MAdmin.Application
{ {
IsRestartRequested = true; IsRestartRequested = true;
Stop().GetAwaiter().GetResult(); Stop().GetAwaiter().GetResult();
_tokenSource.Dispose();
_tokenSource = new CancellationTokenSource(); _tokenSource = new CancellationTokenSource();
} }

View File

@ -56,6 +56,11 @@ namespace Integrations.Cod
{ {
return await SendQueryAsyncInternal(type, parameters, token); return await SendQueryAsyncInternal(type, parameters, token);
} }
catch (RConException ex) when (ex.IsOperationCancelled)
{
_log.LogDebug(ex, "Could not complete RCon request");
throw;
}
catch (Exception ex) catch (Exception ex)
{ {
using (LogContext.PushProperty("Server", Endpoint.ToString())) using (LogContext.PushProperty("Server", Endpoint.ToString()))
@ -67,10 +72,7 @@ namespace Integrations.Cod
} }
finally finally
{ {
using (LogContext.PushProperty("Server", Endpoint.ToString())) _log.LogDebug("Releasing OnComplete {Count}", ActiveQueries[Endpoint].OnComplete.CurrentCount);
{
_log.LogDebug("Releasing OnComplete {Count}", ActiveQueries[Endpoint].OnComplete.CurrentCount);
}
if (ActiveQueries[Endpoint].OnComplete.CurrentCount == 0) if (ActiveQueries[Endpoint].OnComplete.CurrentCount == 0)
{ {
@ -89,7 +91,11 @@ namespace Integrations.Cod
if (!ActiveQueries.TryGetValue(Endpoint, out var connectionState)) if (!ActiveQueries.TryGetValue(Endpoint, out var connectionState))
{ {
_log.LogError("Could not retrieve connection state"); using (LogContext.PushProperty("Server", Endpoint.ToString()))
{
_log.LogError("Could not retrieve connection state");
}
throw new InvalidOperationException("Could not get connection state"); throw new InvalidOperationException("Could not get connection state");
} }
@ -104,7 +110,7 @@ namespace Integrations.Cod
{ {
_log.LogDebug("OnComplete did not complete before timeout {Count}", _log.LogDebug("OnComplete did not complete before timeout {Count}",
connectionState.OnComplete.CurrentCount); connectionState.OnComplete.CurrentCount);
throw new RConException("Timed out waiting for access to rcon socket"); throw new RConException("Timed out waiting for access to rcon socket", true);
} }
var timeSinceLastQuery = (DateTime.Now - connectionState.LastQuery).TotalMilliseconds; var timeSinceLastQuery = (DateTime.Now - connectionState.LastQuery).TotalMilliseconds;
@ -121,7 +127,7 @@ namespace Integrations.Cod
{ {
_log.LogDebug("Waiting for flood protect did not complete before timeout timeout {Count}", _log.LogDebug("Waiting for flood protect did not complete before timeout timeout {Count}",
connectionState.OnComplete.CurrentCount); connectionState.OnComplete.CurrentCount);
throw new RConException("Timed out waiting for flood protect to expire"); throw new RConException("Timed out waiting for flood protect to expire", true);
} }
} }
@ -145,7 +151,7 @@ namespace Integrations.Cod
switch (type) switch (type)
{ {
case StaticHelpers.QueryType.GET_DVAR: case StaticHelpers.QueryType.GET_DVAR:
waitForResponse |= true; waitForResponse = true;
payload = string payload = string
.Format(_config.CommandPrefixes.RConGetDvar, convertedRConPassword, .Format(_config.CommandPrefixes.RConGetDvar, convertedRConPassword,
convertedParameters + '\0').Select(Convert.ToByte).ToArray(); convertedParameters + '\0').Select(Convert.ToByte).ToArray();
@ -161,15 +167,15 @@ namespace Integrations.Cod
convertedParameters + '\0').Select(Convert.ToByte).ToArray(); convertedParameters + '\0').Select(Convert.ToByte).ToArray();
break; break;
case StaticHelpers.QueryType.GET_STATUS: case StaticHelpers.QueryType.GET_STATUS:
waitForResponse |= true; waitForResponse = true;
payload = (_config.CommandPrefixes.RConGetStatus + '\0').Select(Convert.ToByte).ToArray(); payload = (_config.CommandPrefixes.RConGetStatus + '\0').Select(Convert.ToByte).ToArray();
break; break;
case StaticHelpers.QueryType.GET_INFO: case StaticHelpers.QueryType.GET_INFO:
waitForResponse |= true; waitForResponse = true;
payload = (_config.CommandPrefixes.RConGetInfo + '\0').Select(Convert.ToByte).ToArray(); payload = (_config.CommandPrefixes.RConGetInfo + '\0').Select(Convert.ToByte).ToArray();
break; break;
case StaticHelpers.QueryType.COMMAND_STATUS: case StaticHelpers.QueryType.COMMAND_STATUS:
waitForResponse |= true; waitForResponse = true;
payload = string.Format(_config.CommandPrefixes.RConCommand, convertedRConPassword, "status\0") payload = string.Format(_config.CommandPrefixes.RConCommand, convertedRConPassword, "status\0")
.Select(Convert.ToByte).ToArray(); .Select(Convert.ToByte).ToArray();
break; break;
@ -189,20 +195,9 @@ namespace Integrations.Cod
throw new RConException("Invalid character encountered when converting encodings"); throw new RConException("Invalid character encountered when converting encodings");
} }
byte[][] response = null; byte[][] response;
retrySend: retrySend:
if (connectionState.ConnectionAttempts > 1)
{
using (LogContext.PushProperty("Server", Endpoint.ToString()))
{
_log.LogInformation(
"Retrying RCon message ({ConnectionAttempts}/{AllowedConnectionFailures} attempts) with parameters {Payload}",
connectionState.ConnectionAttempts,
_retryAttempts, parameters);
}
}
using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp) using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
{ {
DontFragment = false, DontFragment = false,
@ -210,57 +205,43 @@ namespace Integrations.Cod
ExclusiveAddressUse = true, ExclusiveAddressUse = true,
}) })
{ {
// wait for send to be ready
try
{
await connectionState.OnSentData.WaitAsync(token);
}
catch (OperationCanceledException)
{
_log.LogDebug("OnSent did not complete in time");
throw new RConException("Timed out waiting for access to RCon send socket");
}
// wait for receive to be ready
try
{
await connectionState.OnReceivedData.WaitAsync(token);
}
catch (OperationCanceledException)
{
_log.LogDebug("OnReceived did not complete in time");
if (connectionState.OnSentData.CurrentCount == 0)
{
connectionState.OnSentData.Release();
}
throw new RConException("Timed out waiting for access to RCon receive socket");
}
connectionState.SendEventArgs.UserToken = new ConnectionUserToken
{
Socket = socket,
CancellationToken = token
};
if (!token.IsCancellationRequested) if (!token.IsCancellationRequested)
{ {
connectionState.ConnectionAttempts++; connectionState.ConnectionAttempts++;
} }
connectionState.BytesReadPerSegment.Clear(); connectionState.ReceivedBytes.Clear();
_log.LogDebug( _log.LogDebug(
"Sending {PayloadLength} bytes to [{Endpoint}] ({ConnectionAttempts}/{AllowedConnectionFailures})", "Sending {PayloadLength} bytes to [{Endpoint}] ({ConnectionAttempts}/{AllowedConnectionFailures}) parameters {Payload}",
payload.Length, Endpoint, connectionState.ConnectionAttempts, _retryAttempts); payload.Length, Endpoint, connectionState.ConnectionAttempts, _retryAttempts, parameters);
try try
{ {
connectionState.LastQuery = DateTime.Now; connectionState.LastQuery = DateTime.Now;
var timeout = _parser.OverrideTimeoutForCommand(parameters); var retryTimeout = StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts);
waitForResponse = waitForResponse && timeout.HasValue; var overrideTimeout = _parser.OverrideTimeoutForCommand(parameters);
response = await SendPayloadAsync(payload, waitForResponse, var maxTimeout = !overrideTimeout.HasValue || overrideTimeout == TimeSpan.Zero
timeout ?? TimeSpan.Zero, token); ? retryTimeout
: overrideTimeout.Value;
using var internalTokenSource = new CancellationTokenSource(maxTimeout);
using var chainedTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(token, internalTokenSource.Token);
if (connectionState.ConnectionAttempts > 1)
{
using (LogContext.PushProperty("Server", Endpoint.ToString()))
{
_log.LogInformation(
"Retrying RCon message ({ConnectionAttempts}/{AllowedConnectionFailures} attempts, {Timeout}ms timeout) with parameters {Payload}",
connectionState.ConnectionAttempts, _retryAttempts,
maxTimeout.TotalMilliseconds, parameters);
}
}
waitForResponse = waitForResponse && overrideTimeout.HasValue;
response = await SendPayloadAsync(socket, payload, waitForResponse, chainedTokenSource.Token);
if ((response?.Length == 0 || response[0].Length == 0) && waitForResponse) if ((response?.Length == 0 || response[0].Length == 0) && waitForResponse)
{ {
@ -273,26 +254,23 @@ namespace Integrations.Cod
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
_log.LogDebug("OperationCanceledException when waiting for payload send to complete");
// if we timed out due to the cancellation token, // if we timed out due to the cancellation token,
// we don't want to count that as an attempt // we don't want to count that as an attempt
_log.LogDebug("OperationCanceledException when waiting for payload send to complete"); if (token.IsCancellationRequested)
connectionState.ConnectionAttempts = 0;
}
catch
{
// we want to retry with a delay
if (connectionState.ConnectionAttempts < _retryAttempts)
{ {
try if (connectionState.ConnectionAttempts > 0)
{ {
await Task.Delay(StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts), token); connectionState.ConnectionAttempts--;
}
catch (OperationCanceledException)
{
_log.LogDebug("OperationCancelled while waiting for retry");
throw;
} }
throw new RConException("Timed out waiting on retry delay for RCon socket",
token.IsCancellationRequested);
}
if (connectionState.ConnectionAttempts < _retryAttempts)
{
goto retrySend; goto retrySend;
} }
@ -306,25 +284,24 @@ namespace Integrations.Cod
connectionState.ConnectionAttempts = 0; connectionState.ConnectionAttempts = 0;
throw new NetworkException("Reached maximum retry attempts to send RCon data to server"); throw new NetworkException("Reached maximum retry attempts to send RCon data to server");
} }
finally catch (Exception ex)
{ {
try _log.LogDebug(ex, "RCon Exception");
{
if (connectionState.OnSentData.CurrentCount == 0)
{
connectionState.OnSentData.Release();
}
if (connectionState.OnReceivedData.CurrentCount == 0) if (connectionState.ConnectionAttempts < _retryAttempts)
{
connectionState.OnReceivedData.Release();
}
}
catch
{ {
// ignored because we can have the socket operation cancelled (which releases the semaphore) but goto retrySend;
// this thread is not notified because it's an event
} }
using (LogContext.PushProperty("Server", Endpoint.ToString()))
{
_log.LogWarning(
"Made {ConnectionAttempts} attempts to send RCon data to server, but received no response",
connectionState.ConnectionAttempts);
}
connectionState.ConnectionAttempts = 0;
throw new NetworkException("Reached maximum retry attempts to send RCon data to server");
} }
} }
@ -340,6 +317,81 @@ namespace Integrations.Cod
? ReassembleSegmentedStatus(response) ? ReassembleSegmentedStatus(response)
: RecombineMessages(response); : RecombineMessages(response);
var validatedResponse = ValidateResponse(type, responseString);
return validatedResponse;
}
private async Task<byte[][]> SendPayloadAsync(Socket rconSocket, byte[] payload, bool waitForResponse,
CancellationToken token = default)
{
var connectionState = ActiveQueries[Endpoint];
if (rconSocket is null)
{
_log.LogDebug("Invalid state");
throw new InvalidOperationException("State is not valid for socket operation");
}
var sentByteCount = await rconSocket.SendToAsync(payload, SocketFlags.None, Endpoint, token);
var complete = sentByteCount == payload.Length;
if (!complete)
{
using (LogContext.PushProperty("Server", Endpoint.ToString()))
{
_log.LogWarning("Could not send data to remote RCon socket on attempt #{ConnectionAttempts}",
connectionState.ConnectionAttempts);
}
rconSocket.Close();
throw new NetworkException("Could not send data to remote RCon socket", rconSocket);
}
if (!waitForResponse)
{
return Array.Empty<byte[]>();
}
_log.LogDebug("Waiting to asynchronously receive data on attempt #{ConnectionAttempts}",
connectionState.ConnectionAttempts);
await ReceiveAndStoreSocketData(rconSocket, token, connectionState);
if (_parser.GameName == Server.Game.IW3)
{
await Task.Delay(100, token); // CoD4x shenanigans
}
while (rconSocket.Available > 0)
{
await ReceiveAndStoreSocketData(rconSocket, token, connectionState);
}
rconSocket.Close();
return GetResponseData(connectionState);
}
private async Task ReceiveAndStoreSocketData(Socket rconSocket, CancellationToken token,
ConnectionState connectionState)
{
var result = await rconSocket.ReceiveFromAsync(connectionState.ReceiveBuffer,
SocketFlags.None, Endpoint, token);
if (result.ReceivedBytes == 0)
{
return;
}
var storageBuffer = new byte[result.ReceivedBytes];
Array.Copy(connectionState.ReceiveBuffer, storageBuffer, storageBuffer.Length);
connectionState.ReceivedBytes.Add(storageBuffer);
}
#region Helpers
private string[] ValidateResponse(StaticHelpers.QueryType type, string responseString)
{
// note: not all games respond if the password is wrong or not set // note: not all games respond if the password is wrong or not set
if (responseString.Contains("Invalid password", StringComparison.InvariantCultureIgnoreCase) || if (responseString.Contains("Invalid password", StringComparison.InvariantCultureIgnoreCase) ||
responseString.Contains("rconpassword")) responseString.Contains("rconpassword"))
@ -363,21 +415,19 @@ namespace Integrations.Cod
? _config.CommandPrefixes.RconGetInfoResponseHeader ? _config.CommandPrefixes.RconGetInfoResponseHeader
: responseHeaderMatch); : responseHeaderMatch);
if (headerSplit.Length != 2) if (headerSplit.Length == 2)
{ {
using (LogContext.PushProperty("Server", Endpoint.ToString())) return headerSplit.Last().Split(new[] { '\n' }, StringSplitOptions.RemoveEmptyEntries)
{ .Select(line => line.StartsWith("^7") ? line[2..] : line).ToArray();
_log.LogWarning("Invalid response header from server. Expected {Expected}, but got {Response}",
_config.CommandPrefixes.RConResponse, headerSplit.FirstOrDefault());
}
throw new RConException("Unexpected response header from server");
} }
var splitResponse = headerSplit.Last().Split(new[] { '\n' }, StringSplitOptions.RemoveEmptyEntries) using (LogContext.PushProperty("Server", Endpoint.ToString()))
.Select(line => line.StartsWith("^7") ? line[2..] : line).ToArray(); {
_log.LogWarning("Invalid response header from server. Expected {Expected}, but got {Response}",
return splitResponse; _config.CommandPrefixes.RConResponse, headerSplit.FirstOrDefault());
}
throw new RConException("Unexpected response header from server");
} }
/// <summary> /// <summary>
@ -408,7 +458,7 @@ namespace Integrations.Cod
return string.Join("", splitStatusStrings); return string.Join("", splitStatusStrings);
} }
/// <summary> /// <summary>
/// Recombines multiple game messages into one /// Recombines multiple game messages into one
/// </summary> /// </summary>
@ -437,269 +487,11 @@ namespace Integrations.Cod
return builder.ToString(); return builder.ToString();
} }
private async Task<byte[][]> SendPayloadAsync(byte[] payload, bool waitForResponse, TimeSpan overrideTimeout,
CancellationToken token = default)
{
var connectionState = ActiveQueries[Endpoint];
var rconSocket = ((ConnectionUserToken)connectionState.SendEventArgs.UserToken)?.Socket;
if (rconSocket is null)
{
_log.LogDebug("Invalid state");
throw new InvalidOperationException("State is not valid for socket operation");
}
if (connectionState.ReceiveEventArgs.RemoteEndPoint == null &&
connectionState.SendEventArgs.RemoteEndPoint == null)
{
// setup the event handlers only once because we're reusing the event args
connectionState.SendEventArgs.Completed += OnDataSent;
connectionState.ReceiveEventArgs.Completed += OnDataReceived;
connectionState.ReceiveEventArgs.UserToken = connectionState.SendEventArgs.UserToken;
connectionState.SendEventArgs.RemoteEndPoint = Endpoint;
connectionState.ReceiveEventArgs.RemoteEndPoint = Endpoint;
connectionState.ReceiveEventArgs.DisconnectReuseSocket = true;
connectionState.SendEventArgs.DisconnectReuseSocket = true;
}
if (connectionState.ReceiveEventArgs.UserToken is ConnectionUserToken { CancellationToken.IsCancellationRequested: true })
{
// after a graceful restart we need to reset the receive user token as the cancellation has been updated
connectionState.ReceiveEventArgs.UserToken = connectionState.SendEventArgs.UserToken;
}
connectionState.SendEventArgs.SetBuffer(payload);
// send the data to the server
var sendDataPending = rconSocket.SendToAsync(connectionState.SendEventArgs);
if (sendDataPending)
{
// the send has not been completed asynchronously
// this really shouldn't ever happen because it's UDP
var complete = await connectionState.OnSentData.WaitAsync(StaticHelpers.SocketTimeout(4), token);
if (!complete)
{
using (LogContext.PushProperty("Server", Endpoint.ToString()))
{
_log.LogWarning("Socket timed out while sending RCon data on attempt {Attempt}",
connectionState.ConnectionAttempts);
}
rconSocket.Close();
throw new NetworkException("Timed out sending RCon data", rconSocket);
}
}
if (!waitForResponse)
{
return Array.Empty<byte[]>();
}
connectionState.ReceiveEventArgs.SetBuffer(connectionState.ReceiveBuffer);
// get our response back
var receiveDataPending = rconSocket.ReceiveFromAsync(connectionState.ReceiveEventArgs);
if (receiveDataPending)
{
_log.LogDebug("Waiting to asynchronously receive data on attempt #{ConnectionAttempts}",
connectionState.ConnectionAttempts);
var completed = false;
try
{
completed = await connectionState.OnReceivedData.WaitAsync(
new[]
{
StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts),
overrideTimeout
}.Max(), token);
}
catch (OperationCanceledException)
{
// ignored
}
if (!completed)
{
if (connectionState.ConnectionAttempts > 1) // this reduces some spam for unstable connections
{
using (LogContext.PushProperty("Server", Endpoint.ToString()))
{
_log.LogWarning(
"Socket timed out while waiting for RCon response on attempt {Attempt} with timeout delay of {Timeout}",
connectionState.ConnectionAttempts,
StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts));
}
}
rconSocket.Close();
_log.LogDebug("OnDataReceived did not complete in allocated time");
throw new NetworkException("Timed out receiving RCon response", rconSocket);
}
}
rconSocket.Close();
return GetResponseData(connectionState);
}
private static byte[][] GetResponseData(ConnectionState connectionState) private static byte[][] GetResponseData(ConnectionState connectionState)
{ {
var responseList = new List<byte[]>(); return connectionState.ReceivedBytes.ToArray();
var totalBytesRead = 0;
foreach (var bytesRead in connectionState.BytesReadPerSegment)
{
responseList.Add(connectionState.ReceiveBuffer
.Skip(totalBytesRead)
.Take(bytesRead)
.ToArray());
totalBytesRead += bytesRead;
}
return responseList.ToArray();
}
private void OnDataReceived(object sender, SocketAsyncEventArgs e)
{
_log.LogDebug("Read {BytesTransferred} bytes from {Endpoint}", e.BytesTransferred,
e.RemoteEndPoint?.ToString());
// this occurs when we close the socket
if (e.BytesTransferred == 0)
{
_log.LogDebug("No bytes were transmitted so the connection was probably closed");
var semaphore = ActiveQueries[Endpoint].OnReceivedData;
try
{
if (semaphore.CurrentCount == 0)
{
semaphore.Release();
}
}
catch
{
// ignored because we can have the socket operation cancelled (which releases the semaphore) but
// this thread is not notified because it's an event
}
return;
}
var state = ActiveQueries[Endpoint];
var cancellationRequested = ((ConnectionUserToken)e.UserToken)?.CancellationToken.IsCancellationRequested ??
false;
if (sender is not Socket sock || cancellationRequested)
{
var semaphore = ActiveQueries[Endpoint].OnReceivedData;
try
{
if (semaphore.CurrentCount == 0)
{
semaphore.Release();
}
}
catch
{
// ignored because we can have the socket operation cancelled (which releases the semaphore) but
// this thread is not notified because it's an event
}
return;
}
state.BytesReadPerSegment.Add(e.BytesTransferred);
// I don't even want to know why this works for getting more data from Cod4x
// but I'm leaving it in here as long as it doesn't break anything.
// it's very stupid...
Thread.Sleep(150);
try
{
var totalBytesTransferred = e.BytesTransferred;
_log.LogDebug("{Total} total bytes transferred with {Available} bytes remaining", totalBytesTransferred,
sock.Available);
// we still have available data so the payload was segmented
while (sock.Available > 0)
{
_log.LogDebug("{Available} more bytes to be read", sock.Available);
var bufferSpaceAvailable = sock.Available + totalBytesTransferred - state.ReceiveBuffer.Length;
if (bufferSpaceAvailable >= 0)
{
_log.LogWarning(
"Not enough buffer space to store incoming data {BytesNeeded} additional bytes required",
bufferSpaceAvailable);
continue;
}
state.ReceiveEventArgs.SetBuffer(state.ReceiveBuffer, totalBytesTransferred, sock.Available);
if (sock.ReceiveAsync(state.ReceiveEventArgs))
{
_log.LogDebug("Remaining bytes are async");
continue;
}
_log.LogDebug("Read {BytesTransferred} synchronous bytes from {Endpoint}",
state.ReceiveEventArgs.BytesTransferred, e.RemoteEndPoint?.ToString());
// we need to increment this here because the callback isn't executed if there's no pending IO
state.BytesReadPerSegment.Add(state.ReceiveEventArgs.BytesTransferred);
totalBytesTransferred += state.ReceiveEventArgs.BytesTransferred;
}
}
catch (ObjectDisposedException)
{
_log.LogDebug("Socket was disposed while receiving data");
}
finally
{
var semaphore = ActiveQueries[Endpoint].OnReceivedData;
try
{
if (semaphore.CurrentCount == 0)
{
semaphore.Release();
}
}
catch
{
// ignored because we can have the socket operation cancelled (which releases the semaphore) but
// this thread is not notified because it's an event
}
}
}
private void OnDataSent(object sender, SocketAsyncEventArgs e)
{
_log.LogDebug("Sent {ByteCount} bytes to {Endpoint}", e.Buffer?.Length,
e.ConnectSocket?.RemoteEndPoint?.ToString());
var semaphore = ActiveQueries[Endpoint].OnSentData;
try
{
if (semaphore.CurrentCount == 0)
{
semaphore.Release();
}
}
catch
{
// ignored because we can have the socket operation cancelled (which releases the semaphore) but
// this thread is not notified because it's an event
}
} }
#endregion
} }
} }

View File

@ -1,6 +1,5 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Net.Sockets;
using System.Threading; using System.Threading;
namespace Integrations.Cod namespace Integrations.Cod
@ -13,26 +12,13 @@ namespace Integrations.Cod
~ConnectionState() ~ConnectionState()
{ {
OnComplete.Dispose(); OnComplete.Dispose();
OnSentData.Dispose();
OnReceivedData.Dispose();
} }
public int ConnectionAttempts { get; set; } public int ConnectionAttempts { get; set; }
private const int BufferSize = 16384; private const int BufferSize = 16384;
public readonly byte[] ReceiveBuffer = new byte[BufferSize]; public readonly byte[] ReceiveBuffer = new byte[BufferSize];
public readonly SemaphoreSlim OnComplete = new(1, 1); public readonly SemaphoreSlim OnComplete = new(1, 1);
public readonly SemaphoreSlim OnSentData = new(1, 1); public List<byte[]> ReceivedBytes { get; } = new();
public readonly SemaphoreSlim OnReceivedData = new (1, 1);
public List<int> BytesReadPerSegment { get; set; } = new();
public SocketAsyncEventArgs SendEventArgs { get; set; } = new();
public SocketAsyncEventArgs ReceiveEventArgs { get; set; } = new();
public DateTime LastQuery { get; set; } = DateTime.Now; public DateTime LastQuery { get; set; } = DateTime.Now;
} }
internal class ConnectionUserToken
{
public Socket Socket { get; set; }
public CancellationToken CancellationToken { get; set; }
}
} }

View File

@ -4,8 +4,15 @@ namespace SharedLibraryCore.Exceptions
{ {
public class RConException : Exception public class RConException : Exception
{ {
public bool IsOperationCancelled { get; }
public RConException(string message) : base(message) public RConException(string message) : base(message)
{ {
} }
public RConException(string message, bool isOperationCancelled) : base(message)
{
IsOperationCancelled = isOperationCancelled;
}
} }
} }