more cod rcon tweaks

This commit is contained in:
RaidMax 2022-03-03 08:54:17 -06:00
parent ef70496546
commit 59ca399045

View File

@ -27,8 +27,8 @@ namespace Integrations.Cod
public IPEndPoint Endpoint { get; } public IPEndPoint Endpoint { get; }
public string RConPassword { get; } public string RConPassword { get; }
private IRConParser parser; private IRConParser _parser;
private IRConParserConfiguration config; private IRConParserConfiguration _config;
private readonly ILogger _log; private readonly ILogger _log;
private readonly Encoding _gameEncoding; private readonly Encoding _gameEncoding;
private readonly int _retryAttempts; private readonly int _retryAttempts;
@ -44,8 +44,8 @@ namespace Integrations.Cod
public void SetConfiguration(IRConParser parser) public void SetConfiguration(IRConParser parser)
{ {
this.parser = parser; this._parser = parser;
config = parser.Configuration; _config = parser.Configuration;
} }
public async Task<string[]> SendQueryAsync(StaticHelpers.QueryType type, string parameters = "", public async Task<string[]> SendQueryAsync(StaticHelpers.QueryType type, string parameters = "",
@ -56,12 +56,12 @@ namespace Integrations.Cod
private async Task<string[]> SendQueryAsyncInternal(StaticHelpers.QueryType type, string parameters = "", CancellationToken token = default) private async Task<string[]> SendQueryAsyncInternal(StaticHelpers.QueryType type, string parameters = "", CancellationToken token = default)
{ {
if (!ActiveQueries.ContainsKey(this.Endpoint)) if (!ActiveQueries.ContainsKey(Endpoint))
{ {
ActiveQueries.TryAdd(this.Endpoint, new ConnectionState()); ActiveQueries.TryAdd(Endpoint, new ConnectionState());
} }
var connectionState = ActiveQueries[this.Endpoint]; var connectionState = ActiveQueries[Endpoint];
_log.LogDebug("Waiting for semaphore to be released [{Endpoint}]", Endpoint); _log.LogDebug("Waiting for semaphore to be released [{Endpoint}]", Endpoint);
@ -77,33 +77,30 @@ namespace Integrations.Cod
var timeSinceLastQuery = (DateTime.Now - connectionState.LastQuery).TotalMilliseconds; var timeSinceLastQuery = (DateTime.Now - connectionState.LastQuery).TotalMilliseconds;
if (timeSinceLastQuery < config.FloodProtectInterval) if (timeSinceLastQuery < _config.FloodProtectInterval)
{ {
try try
{ {
await Task.Delay(config.FloodProtectInterval - (int)timeSinceLastQuery, token); await Task.Delay(_config.FloodProtectInterval - (int)timeSinceLastQuery, token);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
throw new RConException("Timed out waiting for flood protect to expire"); throw new RConException("Timed out waiting for flood protect to expire");
} }
finally finally
{ {
if (connectionState.OnComplete.CurrentCount == 0) if (connectionState.OnComplete.CurrentCount == 0)
{ {
connectionState.OnComplete.Release(1); connectionState.OnComplete.Release();
} }
} }
} }
connectionState.LastQuery = DateTime.Now;
_log.LogDebug("Semaphore has been released [{Endpoint}]", Endpoint); _log.LogDebug("Semaphore has been released [{Endpoint}]", Endpoint);
_log.LogDebug("Query {@QueryInfo}", new { endpoint=Endpoint.ToString(), type, parameters }); _log.LogDebug("Query {@QueryInfo}", new { endpoint=Endpoint.ToString(), type, parameters });
byte[] payload = null; byte[] payload = null;
var waitForResponse = config.WaitForResponse; var waitForResponse = _config.WaitForResponse;
string ConvertEncoding(string text) string ConvertEncoding(string text)
{ {
@ -121,30 +118,30 @@ namespace Integrations.Cod
case StaticHelpers.QueryType.GET_DVAR: case StaticHelpers.QueryType.GET_DVAR:
waitForResponse |= true; waitForResponse |= true;
payload = string payload = string
.Format(config.CommandPrefixes.RConGetDvar, convertedRConPassword, .Format(_config.CommandPrefixes.RConGetDvar, convertedRConPassword,
convertedParameters + '\0').Select(Convert.ToByte).ToArray(); convertedParameters + '\0').Select(Convert.ToByte).ToArray();
break; break;
case StaticHelpers.QueryType.SET_DVAR: case StaticHelpers.QueryType.SET_DVAR:
payload = string payload = string
.Format(config.CommandPrefixes.RConSetDvar, convertedRConPassword, .Format(_config.CommandPrefixes.RConSetDvar, convertedRConPassword,
convertedParameters + '\0').Select(Convert.ToByte).ToArray(); convertedParameters + '\0').Select(Convert.ToByte).ToArray();
break; break;
case StaticHelpers.QueryType.COMMAND: case StaticHelpers.QueryType.COMMAND:
payload = string payload = string
.Format(config.CommandPrefixes.RConCommand, convertedRConPassword, .Format(_config.CommandPrefixes.RConCommand, convertedRConPassword,
convertedParameters + '\0').Select(Convert.ToByte).ToArray(); convertedParameters + '\0').Select(Convert.ToByte).ToArray();
break; break;
case StaticHelpers.QueryType.GET_STATUS: case StaticHelpers.QueryType.GET_STATUS:
waitForResponse |= true; waitForResponse |= true;
payload = (config.CommandPrefixes.RConGetStatus + '\0').Select(Convert.ToByte).ToArray(); payload = (_config.CommandPrefixes.RConGetStatus + '\0').Select(Convert.ToByte).ToArray();
break; break;
case StaticHelpers.QueryType.GET_INFO: case StaticHelpers.QueryType.GET_INFO:
waitForResponse |= true; waitForResponse |= true;
payload = (config.CommandPrefixes.RConGetInfo + '\0').Select(Convert.ToByte).ToArray(); payload = (_config.CommandPrefixes.RConGetInfo + '\0').Select(Convert.ToByte).ToArray();
break; break;
case StaticHelpers.QueryType.COMMAND_STATUS: case StaticHelpers.QueryType.COMMAND_STATUS:
waitForResponse |= true; waitForResponse |= true;
payload = string.Format(config.CommandPrefixes.RConCommand, convertedRConPassword, "status\0") payload = string.Format(_config.CommandPrefixes.RConCommand, convertedRConPassword, "status\0")
.Select(Convert.ToByte).ToArray(); .Select(Convert.ToByte).ToArray();
break; break;
} }
@ -157,7 +154,7 @@ namespace Integrations.Cod
using (LogContext.PushProperty("Server", Endpoint.ToString())) using (LogContext.PushProperty("Server", Endpoint.ToString()))
{ {
_log.LogError(ex, "Could not convert RCon data payload to desired encoding {encoding} {params}", _log.LogError(ex, "Could not convert RCon data payload to desired encoding {Encoding} {Params}",
_gameEncoding.EncodingName, parameters); _gameEncoding.EncodingName, parameters);
} }
@ -168,7 +165,7 @@ namespace Integrations.Cod
{ {
if (connectionState.OnComplete.CurrentCount == 0) if (connectionState.OnComplete.CurrentCount == 0)
{ {
connectionState.OnComplete.Release(1); connectionState.OnComplete.Release();
} }
} }
@ -192,9 +189,6 @@ namespace Integrations.Cod
ExclusiveAddressUse = true, ExclusiveAddressUse = true,
}) })
{ {
connectionState.SendEventArgs.UserToken = socket;
connectionState.ConnectionAttempts++;
// wait for send to complete // wait for send to complete
try try
{ {
@ -208,7 +202,7 @@ namespace Integrations.Cod
{ {
if (connectionState.OnComplete.CurrentCount == 0 ) if (connectionState.OnComplete.CurrentCount == 0 )
{ {
connectionState.OnComplete.Release(1); connectionState.OnComplete.Release();
} }
} }
@ -225,15 +219,18 @@ namespace Integrations.Cod
{ {
if (connectionState.OnComplete.CurrentCount == 0 ) if (connectionState.OnComplete.CurrentCount == 0 )
{ {
connectionState.OnComplete.Release(1); connectionState.OnComplete.Release();
} }
if (connectionState.OnSentData.CurrentCount == 0) if (connectionState.OnSentData.CurrentCount == 0)
{ {
connectionState.OnSentData.Release(1); connectionState.OnSentData.Release();
} }
} }
connectionState.SendEventArgs.UserToken = socket;
connectionState.ConnectionAttempts++;
connectionState.BytesReadPerSegment.Clear(); connectionState.BytesReadPerSegment.Clear();
var exceptionCaught = false; var exceptionCaught = false;
@ -242,16 +239,10 @@ namespace Integrations.Cod
try try
{ {
try connectionState.LastQuery = DateTime.Now;
{ response = await SendPayloadAsync(payload, waitForResponse,
response = await SendPayloadAsync(payload, waitForResponse, _parser.OverrideTimeoutForCommand(parameters), token);
parser.OverrideTimeoutForCommand(parameters), token);
}
catch (OperationCanceledException)
{
// ignored
}
if ((response?.Length == 0 || response[0].Length == 0) && waitForResponse) if ((response?.Length == 0 || response[0].Length == 0) && waitForResponse)
{ {
throw new RConException("Expected response but got 0 bytes back"); throw new RConException("Expected response but got 0 bytes back");
@ -260,6 +251,12 @@ namespace Integrations.Cod
connectionState.ConnectionAttempts = 0; connectionState.ConnectionAttempts = 0;
} }
catch (OperationCanceledException)
{
// if we timed out due to the cancellation token,
// we don't want to count that as an attempt
connectionState.ConnectionAttempts = 0;
}
catch catch
{ {
// we want to retry with a delay // we want to retry with a delay
@ -268,8 +265,10 @@ namespace Integrations.Cod
exceptionCaught = true; exceptionCaught = true;
try try
{ {
await Task.Delay(StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts), token); await Task.Delay(
token != CancellationToken.None
? StaticHelpers.SocketTimeout(100) // if using cancellation token we don't care about attempt count
: StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts), token);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
@ -288,13 +287,12 @@ namespace Integrations.Cod
connectionState.ConnectionAttempts = 0; connectionState.ConnectionAttempts = 0;
throw new NetworkException("Reached maximum retry attempts to send RCon data to server"); throw new NetworkException("Reached maximum retry attempts to send RCon data to server");
} }
finally finally
{ {
// we don't want to release if we're going to retry the query // we don't want to release if we're going to retry the query
if (connectionState.OnComplete.CurrentCount == 0 && !exceptionCaught) if (connectionState.OnComplete.CurrentCount == 0 && !exceptionCaught)
{ {
connectionState.OnComplete.Release(1); connectionState.OnComplete.Release();
} }
if (connectionState.OnSentData.CurrentCount == 0) if (connectionState.OnSentData.CurrentCount == 0)
@ -330,20 +328,20 @@ namespace Integrations.Cod
throw new RConException(Utilities.CurrentLocalization.LocalizationIndex["SERVER_ERROR_RCON_NOTSET"]); throw new RConException(Utilities.CurrentLocalization.LocalizationIndex["SERVER_ERROR_RCON_NOTSET"]);
} }
if (responseString.Contains(config.ServerNotRunningResponse)) if (responseString.Contains(_config.ServerNotRunningResponse))
{ {
throw new ServerException(Utilities.CurrentLocalization.LocalizationIndex["SERVER_ERROR_NOT_RUNNING"].FormatExt(Endpoint.ToString())); throw new ServerException(Utilities.CurrentLocalization.LocalizationIndex["SERVER_ERROR_NOT_RUNNING"].FormatExt(Endpoint.ToString()));
} }
var responseHeaderMatch = Regex.Match(responseString, config.CommandPrefixes.RConResponse).Value; var responseHeaderMatch = Regex.Match(responseString, _config.CommandPrefixes.RConResponse).Value;
var headerSplit = responseString.Split(type == StaticHelpers.QueryType.GET_INFO ? config.CommandPrefixes.RconGetInfoResponseHeader : responseHeaderMatch); var headerSplit = responseString.Split(type == StaticHelpers.QueryType.GET_INFO ? _config.CommandPrefixes.RconGetInfoResponseHeader : responseHeaderMatch);
if (headerSplit.Length != 2) if (headerSplit.Length != 2)
{ {
using (LogContext.PushProperty("Server", Endpoint.ToString())) using (LogContext.PushProperty("Server", Endpoint.ToString()))
{ {
_log.LogWarning("Invalid response header from server. Expected {Expected}, but got {Response}", _log.LogWarning("Invalid response header from server. Expected {Expected}, but got {Response}",
config.CommandPrefixes.RConResponse, headerSplit.FirstOrDefault()); _config.CommandPrefixes.RConResponse, headerSplit.FirstOrDefault());
} }
throw new RConException("Unexpected response header from server"); throw new RConException("Unexpected response header from server");
@ -359,14 +357,14 @@ namespace Integrations.Cod
/// </summary> /// </summary>
/// <param name="segments">array of segmented byte arrays</param> /// <param name="segments">array of segmented byte arrays</param>
/// <returns></returns> /// <returns></returns>
public string ReassembleSegmentedStatus(byte[][] segments) private string ReassembleSegmentedStatus(byte[][] segments)
{ {
var splitStatusStrings = new List<string>(); var splitStatusStrings = new List<string>();
foreach (byte[] segment in segments) foreach (var segment in segments)
{ {
string responseString = _gameEncoding.GetString(segment, 0, segment.Length); var responseString = _gameEncoding.GetString(segment, 0, segment.Length);
var statusHeaderMatch = config.StatusHeader.PatternMatcher.Match(responseString); var statusHeaderMatch = _config.StatusHeader.PatternMatcher.Match(responseString);
if (statusHeaderMatch.Success) if (statusHeaderMatch.Success)
{ {
splitStatusStrings.Insert(0, responseString.TrimEnd('\0')); splitStatusStrings.Insert(0, responseString.TrimEnd('\0'));
@ -374,7 +372,7 @@ namespace Integrations.Cod
else else
{ {
splitStatusStrings.Add(responseString.Replace(config.CommandPrefixes.RConResponse, "").TrimEnd('\0')); splitStatusStrings.Add(responseString.Replace(_config.CommandPrefixes.RConResponse, "").TrimEnd('\0'));
} }
} }
@ -386,9 +384,9 @@ namespace Integrations.Cod
/// </summary> /// </summary>
/// <param name="payload"></param> /// <param name="payload"></param>
/// <returns></returns> /// <returns></returns>
private string RecombineMessages(byte[][] payload) private string RecombineMessages(IReadOnlyList<byte[]> payload)
{ {
if (payload.Length == 1) if (payload.Count == 1)
{ {
return _gameEncoding.GetString(payload[0]).TrimEnd('\n') + '\n'; return _gameEncoding.GetString(payload[0]).TrimEnd('\n') + '\n';
} }
@ -396,12 +394,12 @@ namespace Integrations.Cod
else else
{ {
var builder = new StringBuilder(); var builder = new StringBuilder();
for (int i = 0; i < payload.Length; i++) for (int i = 0; i < payload.Count; i++)
{ {
string message = _gameEncoding.GetString(payload[i]).TrimEnd('\n') + '\n'; string message = _gameEncoding.GetString(payload[i]).TrimEnd('\n') + '\n';
if (i > 0) if (i > 0)
{ {
message = message.Replace(config.CommandPrefixes.RConResponse, ""); message = message.Replace(_config.CommandPrefixes.RConResponse, "");
} }
builder.Append(message); builder.Append(message);
} }
@ -410,11 +408,17 @@ namespace Integrations.Cod
} }
} }
private async Task<byte[][]> SendPayloadAsync(byte[] payload, bool waitForResponse, TimeSpan overrideTimeout, CancellationToken token = default) private async Task<byte[][]> SendPayloadAsync(byte[] payload, bool waitForResponse, TimeSpan overrideTimeout,
CancellationToken token = default)
{ {
var connectionState = ActiveQueries[this.Endpoint]; var connectionState = ActiveQueries[Endpoint];
var rconSocket = (Socket)connectionState.SendEventArgs.UserToken; var rconSocket = (Socket)connectionState.SendEventArgs.UserToken;
if (rconSocket is null)
{
throw new InvalidOperationException("State is not valid for socket operation");
}
if (connectionState.ReceiveEventArgs.RemoteEndPoint == null && if (connectionState.ReceiveEventArgs.RemoteEndPoint == null &&
connectionState.SendEventArgs.RemoteEndPoint == null) connectionState.SendEventArgs.RemoteEndPoint == null)
{ {
@ -458,7 +462,7 @@ namespace Integrations.Cod
if (connectionState.OnSentData.CurrentCount == 0) if (connectionState.OnSentData.CurrentCount == 0)
{ {
connectionState.OnSentData.Release(1); connectionState.OnSentData.Release();
} }
throw new NetworkException("Timed out sending RCon data", rconSocket); throw new NetworkException("Timed out sending RCon data", rconSocket);
@ -502,7 +506,7 @@ namespace Integrations.Cod
using (LogContext.PushProperty("Server", Endpoint.ToString())) using (LogContext.PushProperty("Server", Endpoint.ToString()))
{ {
_log.LogWarning( _log.LogWarning(
"Socket timed out while waiting for RCon response on attempt {attempt} with timeout delay of {timeout}", "Socket timed out while waiting for RCon response on attempt {Attempt} with timeout delay of {Timeout}",
connectionState.ConnectionAttempts, connectionState.ConnectionAttempts,
StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts)); StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts));
} }
@ -510,7 +514,7 @@ namespace Integrations.Cod
if (connectionState.OnReceivedData.CurrentCount == 0) if (connectionState.OnReceivedData.CurrentCount == 0)
{ {
connectionState.OnReceivedData.Release(1); connectionState.OnReceivedData.Release();
} }
rconSocket.Close(); rconSocket.Close();
@ -542,34 +546,52 @@ namespace Integrations.Cod
private void OnDataReceived(object sender, SocketAsyncEventArgs e) private void OnDataReceived(object sender, SocketAsyncEventArgs e)
{ {
_log.LogDebug("Read {bytesTransferred} bytes from {endpoint}", e.BytesTransferred, e.RemoteEndPoint); _log.LogDebug("Read {BytesTransferred} bytes from {Endpoint}", e.BytesTransferred, e.RemoteEndPoint?.ToString());
// this occurs when we close the socket // this occurs when we close the socket
if (e.BytesTransferred == 0) if (e.BytesTransferred == 0)
{ {
_log.LogDebug("No bytes were transmitted so the connection was probably closed"); _log.LogDebug("No bytes were transmitted so the connection was probably closed");
var semaphore = ActiveQueries[this.Endpoint].OnReceivedData; var semaphore = ActiveQueries[Endpoint].OnReceivedData;
if (semaphore.CurrentCount == 0)
try
{ {
semaphore.Release(); if (semaphore.CurrentCount == 0)
{
semaphore.Release();
}
} }
catch
{
// ignored because we can have the socket operation cancelled (which releases the semaphore) but
// this thread is not notified because it's an event
}
return; return;
} }
if (sender is not Socket sock) if (sender is not Socket sock)
{ {
var semaphore = ActiveQueries[this.Endpoint].OnReceivedData; var semaphore = ActiveQueries[Endpoint].OnReceivedData;
if (semaphore.CurrentCount == 0)
try
{ {
semaphore.Release(); if (semaphore.CurrentCount == 0)
{
semaphore.Release();
}
} }
catch
{
// ignored because we can have the socket operation cancelled (which releases the semaphore) but
// this thread is not notified because it's an event
}
return; return;
} }
var state = ActiveQueries[this.Endpoint]; var state = ActiveQueries[Endpoint];
state.BytesReadPerSegment.Add(e.BytesTransferred); state.BytesReadPerSegment.Add(e.BytesTransferred);
// I don't even want to know why this works for getting more data from Cod4x // I don't even want to know why this works for getting more data from Cod4x
@ -580,18 +602,18 @@ namespace Integrations.Cod
try try
{ {
var totalBytesTransferred = e.BytesTransferred; var totalBytesTransferred = e.BytesTransferred;
_log.LogDebug("{total} total bytes transferred with {available} bytes remaining", totalBytesTransferred, _log.LogDebug("{Total} total bytes transferred with {Available} bytes remaining", totalBytesTransferred,
sock.Available); sock.Available);
// we still have available data so the payload was segmented // we still have available data so the payload was segmented
while (sock.Available > 0) while (sock.Available > 0)
{ {
_log.LogDebug("{available} more bytes to be read", sock.Available); _log.LogDebug("{Available} more bytes to be read", sock.Available);
var bufferSpaceAvailable = sock.Available + totalBytesTransferred - state.ReceiveBuffer.Length; var bufferSpaceAvailable = sock.Available + totalBytesTransferred - state.ReceiveBuffer.Length;
if (bufferSpaceAvailable >= 0) if (bufferSpaceAvailable >= 0)
{ {
_log.LogWarning( _log.LogWarning(
"Not enough buffer space to store incoming data {bytesNeeded} additional bytes required", "Not enough buffer space to store incoming data {BytesNeeded} additional bytes required",
bufferSpaceAvailable); bufferSpaceAvailable);
continue; continue;
} }
@ -604,8 +626,8 @@ namespace Integrations.Cod
continue; continue;
} }
_log.LogDebug("Read {bytesTransferred} synchronous bytes from {endpoint}", _log.LogDebug("Read {BytesTransferred} synchronous bytes from {Endpoint}",
state.ReceiveEventArgs.BytesTransferred, e.RemoteEndPoint); state.ReceiveEventArgs.BytesTransferred, e.RemoteEndPoint?.ToString());
// we need to increment this here because the callback isn't executed if there's no pending IO // we need to increment this here because the callback isn't executed if there's no pending IO
state.BytesReadPerSegment.Add(state.ReceiveEventArgs.BytesTransferred); state.BytesReadPerSegment.Add(state.ReceiveEventArgs.BytesTransferred);
totalBytesTransferred += state.ReceiveEventArgs.BytesTransferred; totalBytesTransferred += state.ReceiveEventArgs.BytesTransferred;
@ -619,22 +641,38 @@ namespace Integrations.Cod
finally finally
{ {
var semaphore = ActiveQueries[this.Endpoint].OnReceivedData; var semaphore = ActiveQueries[Endpoint].OnReceivedData;
if (semaphore.CurrentCount == 0) try
{ {
semaphore.Release(); if (semaphore.CurrentCount == 0)
{
semaphore.Release();
}
}
catch
{
// ignored because we can have the socket operation cancelled (which releases the semaphore) but
// this thread is not notified because it's an event
} }
} }
} }
private void OnDataSent(object sender, SocketAsyncEventArgs e) private void OnDataSent(object sender, SocketAsyncEventArgs e)
{ {
_log.LogDebug("Sent {byteCount} bytes to {endpoint}", e.Buffer?.Length, e.ConnectSocket?.RemoteEndPoint); _log.LogDebug("Sent {ByteCount} bytes to {Endpoint}", e.Buffer?.Length, e.ConnectSocket?.RemoteEndPoint?.ToString());
var semaphore = ActiveQueries[this.Endpoint].OnSentData; var semaphore = ActiveQueries[Endpoint].OnSentData;
if (semaphore.CurrentCount == 0) try
{ {
semaphore.Release(); if (semaphore.CurrentCount == 0)
{
semaphore.Release();
}
}
catch
{
// ignored because we can have the socket operation cancelled (which releases the semaphore) but
// this thread is not notified because it's an event
} }
} }
} }