diff --git a/Integrations/Cod/CodRConConnection.cs b/Integrations/Cod/CodRConConnection.cs index 75ce69838..109afbb93 100644 --- a/Integrations/Cod/CodRConConnection.cs +++ b/Integrations/Cod/CodRConConnection.cs @@ -27,8 +27,8 @@ namespace Integrations.Cod public IPEndPoint Endpoint { get; } public string RConPassword { get; } - private IRConParser parser; - private IRConParserConfiguration config; + private IRConParser _parser; + private IRConParserConfiguration _config; private readonly ILogger _log; private readonly Encoding _gameEncoding; private readonly int _retryAttempts; @@ -44,8 +44,8 @@ namespace Integrations.Cod public void SetConfiguration(IRConParser parser) { - this.parser = parser; - config = parser.Configuration; + this._parser = parser; + _config = parser.Configuration; } public async Task SendQueryAsync(StaticHelpers.QueryType type, string parameters = "", @@ -56,12 +56,12 @@ namespace Integrations.Cod private async Task SendQueryAsyncInternal(StaticHelpers.QueryType type, string parameters = "", CancellationToken token = default) { - if (!ActiveQueries.ContainsKey(this.Endpoint)) + if (!ActiveQueries.ContainsKey(Endpoint)) { - ActiveQueries.TryAdd(this.Endpoint, new ConnectionState()); + ActiveQueries.TryAdd(Endpoint, new ConnectionState()); } - var connectionState = ActiveQueries[this.Endpoint]; + var connectionState = ActiveQueries[Endpoint]; _log.LogDebug("Waiting for semaphore to be released [{Endpoint}]", Endpoint); @@ -77,33 +77,30 @@ namespace Integrations.Cod var timeSinceLastQuery = (DateTime.Now - connectionState.LastQuery).TotalMilliseconds; - if (timeSinceLastQuery < config.FloodProtectInterval) + if (timeSinceLastQuery < _config.FloodProtectInterval) { try { - await Task.Delay(config.FloodProtectInterval - (int)timeSinceLastQuery, token); + await Task.Delay(_config.FloodProtectInterval - (int)timeSinceLastQuery, token); } catch (OperationCanceledException) { throw new RConException("Timed out waiting for flood protect to expire"); } - finally { if (connectionState.OnComplete.CurrentCount == 0) { - connectionState.OnComplete.Release(1); + connectionState.OnComplete.Release(); } } } - connectionState.LastQuery = DateTime.Now; - _log.LogDebug("Semaphore has been released [{Endpoint}]", Endpoint); _log.LogDebug("Query {@QueryInfo}", new { endpoint=Endpoint.ToString(), type, parameters }); byte[] payload = null; - var waitForResponse = config.WaitForResponse; + var waitForResponse = _config.WaitForResponse; string ConvertEncoding(string text) { @@ -121,30 +118,30 @@ namespace Integrations.Cod case StaticHelpers.QueryType.GET_DVAR: waitForResponse |= true; payload = string - .Format(config.CommandPrefixes.RConGetDvar, convertedRConPassword, + .Format(_config.CommandPrefixes.RConGetDvar, convertedRConPassword, convertedParameters + '\0').Select(Convert.ToByte).ToArray(); break; case StaticHelpers.QueryType.SET_DVAR: payload = string - .Format(config.CommandPrefixes.RConSetDvar, convertedRConPassword, + .Format(_config.CommandPrefixes.RConSetDvar, convertedRConPassword, convertedParameters + '\0').Select(Convert.ToByte).ToArray(); break; case StaticHelpers.QueryType.COMMAND: payload = string - .Format(config.CommandPrefixes.RConCommand, convertedRConPassword, + .Format(_config.CommandPrefixes.RConCommand, convertedRConPassword, convertedParameters + '\0').Select(Convert.ToByte).ToArray(); break; case StaticHelpers.QueryType.GET_STATUS: waitForResponse |= true; - payload = (config.CommandPrefixes.RConGetStatus + '\0').Select(Convert.ToByte).ToArray(); + payload = (_config.CommandPrefixes.RConGetStatus + '\0').Select(Convert.ToByte).ToArray(); break; case StaticHelpers.QueryType.GET_INFO: waitForResponse |= true; - payload = (config.CommandPrefixes.RConGetInfo + '\0').Select(Convert.ToByte).ToArray(); + payload = (_config.CommandPrefixes.RConGetInfo + '\0').Select(Convert.ToByte).ToArray(); break; case StaticHelpers.QueryType.COMMAND_STATUS: waitForResponse |= true; - payload = string.Format(config.CommandPrefixes.RConCommand, convertedRConPassword, "status\0") + payload = string.Format(_config.CommandPrefixes.RConCommand, convertedRConPassword, "status\0") .Select(Convert.ToByte).ToArray(); break; } @@ -157,7 +154,7 @@ namespace Integrations.Cod using (LogContext.PushProperty("Server", Endpoint.ToString())) { - _log.LogError(ex, "Could not convert RCon data payload to desired encoding {encoding} {params}", + _log.LogError(ex, "Could not convert RCon data payload to desired encoding {Encoding} {Params}", _gameEncoding.EncodingName, parameters); } @@ -168,7 +165,7 @@ namespace Integrations.Cod { if (connectionState.OnComplete.CurrentCount == 0) { - connectionState.OnComplete.Release(1); + connectionState.OnComplete.Release(); } } @@ -192,9 +189,6 @@ namespace Integrations.Cod ExclusiveAddressUse = true, }) { - connectionState.SendEventArgs.UserToken = socket; - connectionState.ConnectionAttempts++; - // wait for send to complete try { @@ -208,7 +202,7 @@ namespace Integrations.Cod { if (connectionState.OnComplete.CurrentCount == 0 ) { - connectionState.OnComplete.Release(1); + connectionState.OnComplete.Release(); } } @@ -225,15 +219,18 @@ namespace Integrations.Cod { if (connectionState.OnComplete.CurrentCount == 0 ) { - connectionState.OnComplete.Release(1); + connectionState.OnComplete.Release(); } if (connectionState.OnSentData.CurrentCount == 0) { - connectionState.OnSentData.Release(1); + connectionState.OnSentData.Release(); } } + connectionState.SendEventArgs.UserToken = socket; + connectionState.ConnectionAttempts++; + connectionState.BytesReadPerSegment.Clear(); var exceptionCaught = false; @@ -242,16 +239,10 @@ namespace Integrations.Cod try { - try - { - response = await SendPayloadAsync(payload, waitForResponse, - parser.OverrideTimeoutForCommand(parameters), token); - } - catch (OperationCanceledException) - { - // ignored - } - + connectionState.LastQuery = DateTime.Now; + response = await SendPayloadAsync(payload, waitForResponse, + _parser.OverrideTimeoutForCommand(parameters), token); + if ((response?.Length == 0 || response[0].Length == 0) && waitForResponse) { throw new RConException("Expected response but got 0 bytes back"); @@ -260,6 +251,12 @@ namespace Integrations.Cod connectionState.ConnectionAttempts = 0; } + catch (OperationCanceledException) + { + // if we timed out due to the cancellation token, + // we don't want to count that as an attempt + connectionState.ConnectionAttempts = 0; + } catch { // we want to retry with a delay @@ -268,8 +265,10 @@ namespace Integrations.Cod exceptionCaught = true; try { - await Task.Delay(StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts), token); - + await Task.Delay( + token != CancellationToken.None + ? StaticHelpers.SocketTimeout(100) // if using cancellation token we don't care about attempt count + : StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts), token); } catch (OperationCanceledException) { @@ -288,13 +287,12 @@ namespace Integrations.Cod connectionState.ConnectionAttempts = 0; throw new NetworkException("Reached maximum retry attempts to send RCon data to server"); } - finally { // we don't want to release if we're going to retry the query if (connectionState.OnComplete.CurrentCount == 0 && !exceptionCaught) { - connectionState.OnComplete.Release(1); + connectionState.OnComplete.Release(); } if (connectionState.OnSentData.CurrentCount == 0) @@ -330,20 +328,20 @@ namespace Integrations.Cod throw new RConException(Utilities.CurrentLocalization.LocalizationIndex["SERVER_ERROR_RCON_NOTSET"]); } - if (responseString.Contains(config.ServerNotRunningResponse)) + if (responseString.Contains(_config.ServerNotRunningResponse)) { throw new ServerException(Utilities.CurrentLocalization.LocalizationIndex["SERVER_ERROR_NOT_RUNNING"].FormatExt(Endpoint.ToString())); } - var responseHeaderMatch = Regex.Match(responseString, config.CommandPrefixes.RConResponse).Value; - var headerSplit = responseString.Split(type == StaticHelpers.QueryType.GET_INFO ? config.CommandPrefixes.RconGetInfoResponseHeader : responseHeaderMatch); + var responseHeaderMatch = Regex.Match(responseString, _config.CommandPrefixes.RConResponse).Value; + var headerSplit = responseString.Split(type == StaticHelpers.QueryType.GET_INFO ? _config.CommandPrefixes.RconGetInfoResponseHeader : responseHeaderMatch); if (headerSplit.Length != 2) { using (LogContext.PushProperty("Server", Endpoint.ToString())) { _log.LogWarning("Invalid response header from server. Expected {Expected}, but got {Response}", - config.CommandPrefixes.RConResponse, headerSplit.FirstOrDefault()); + _config.CommandPrefixes.RConResponse, headerSplit.FirstOrDefault()); } throw new RConException("Unexpected response header from server"); @@ -359,14 +357,14 @@ namespace Integrations.Cod /// /// array of segmented byte arrays /// - public string ReassembleSegmentedStatus(byte[][] segments) + private string ReassembleSegmentedStatus(byte[][] segments) { var splitStatusStrings = new List(); - foreach (byte[] segment in segments) + foreach (var segment in segments) { - string responseString = _gameEncoding.GetString(segment, 0, segment.Length); - var statusHeaderMatch = config.StatusHeader.PatternMatcher.Match(responseString); + var responseString = _gameEncoding.GetString(segment, 0, segment.Length); + var statusHeaderMatch = _config.StatusHeader.PatternMatcher.Match(responseString); if (statusHeaderMatch.Success) { splitStatusStrings.Insert(0, responseString.TrimEnd('\0')); @@ -374,7 +372,7 @@ namespace Integrations.Cod else { - splitStatusStrings.Add(responseString.Replace(config.CommandPrefixes.RConResponse, "").TrimEnd('\0')); + splitStatusStrings.Add(responseString.Replace(_config.CommandPrefixes.RConResponse, "").TrimEnd('\0')); } } @@ -386,9 +384,9 @@ namespace Integrations.Cod /// /// /// - private string RecombineMessages(byte[][] payload) + private string RecombineMessages(IReadOnlyList payload) { - if (payload.Length == 1) + if (payload.Count == 1) { return _gameEncoding.GetString(payload[0]).TrimEnd('\n') + '\n'; } @@ -396,12 +394,12 @@ namespace Integrations.Cod else { var builder = new StringBuilder(); - for (int i = 0; i < payload.Length; i++) + for (int i = 0; i < payload.Count; i++) { string message = _gameEncoding.GetString(payload[i]).TrimEnd('\n') + '\n'; if (i > 0) { - message = message.Replace(config.CommandPrefixes.RConResponse, ""); + message = message.Replace(_config.CommandPrefixes.RConResponse, ""); } builder.Append(message); } @@ -410,11 +408,17 @@ namespace Integrations.Cod } } - private async Task SendPayloadAsync(byte[] payload, bool waitForResponse, TimeSpan overrideTimeout, CancellationToken token = default) + private async Task SendPayloadAsync(byte[] payload, bool waitForResponse, TimeSpan overrideTimeout, + CancellationToken token = default) { - var connectionState = ActiveQueries[this.Endpoint]; + var connectionState = ActiveQueries[Endpoint]; var rconSocket = (Socket)connectionState.SendEventArgs.UserToken; + if (rconSocket is null) + { + throw new InvalidOperationException("State is not valid for socket operation"); + } + if (connectionState.ReceiveEventArgs.RemoteEndPoint == null && connectionState.SendEventArgs.RemoteEndPoint == null) { @@ -458,7 +462,7 @@ namespace Integrations.Cod if (connectionState.OnSentData.CurrentCount == 0) { - connectionState.OnSentData.Release(1); + connectionState.OnSentData.Release(); } throw new NetworkException("Timed out sending RCon data", rconSocket); @@ -502,7 +506,7 @@ namespace Integrations.Cod using (LogContext.PushProperty("Server", Endpoint.ToString())) { _log.LogWarning( - "Socket timed out while waiting for RCon response on attempt {attempt} with timeout delay of {timeout}", + "Socket timed out while waiting for RCon response on attempt {Attempt} with timeout delay of {Timeout}", connectionState.ConnectionAttempts, StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts)); } @@ -510,7 +514,7 @@ namespace Integrations.Cod if (connectionState.OnReceivedData.CurrentCount == 0) { - connectionState.OnReceivedData.Release(1); + connectionState.OnReceivedData.Release(); } rconSocket.Close(); @@ -542,34 +546,52 @@ namespace Integrations.Cod private void OnDataReceived(object sender, SocketAsyncEventArgs e) { - _log.LogDebug("Read {bytesTransferred} bytes from {endpoint}", e.BytesTransferred, e.RemoteEndPoint); + _log.LogDebug("Read {BytesTransferred} bytes from {Endpoint}", e.BytesTransferred, e.RemoteEndPoint?.ToString()); // this occurs when we close the socket if (e.BytesTransferred == 0) { _log.LogDebug("No bytes were transmitted so the connection was probably closed"); - var semaphore = ActiveQueries[this.Endpoint].OnReceivedData; - if (semaphore.CurrentCount == 0) + var semaphore = ActiveQueries[Endpoint].OnReceivedData; + + try { - semaphore.Release(); + if (semaphore.CurrentCount == 0) + { + semaphore.Release(); + } } - + catch + { + // ignored because we can have the socket operation cancelled (which releases the semaphore) but + // this thread is not notified because it's an event + } + return; } if (sender is not Socket sock) { - var semaphore = ActiveQueries[this.Endpoint].OnReceivedData; - if (semaphore.CurrentCount == 0) + var semaphore = ActiveQueries[Endpoint].OnReceivedData; + + try { - semaphore.Release(); + if (semaphore.CurrentCount == 0) + { + semaphore.Release(); + } } - + catch + { + // ignored because we can have the socket operation cancelled (which releases the semaphore) but + // this thread is not notified because it's an event + } + return; } - var state = ActiveQueries[this.Endpoint]; + var state = ActiveQueries[Endpoint]; state.BytesReadPerSegment.Add(e.BytesTransferred); // I don't even want to know why this works for getting more data from Cod4x @@ -580,18 +602,18 @@ namespace Integrations.Cod try { var totalBytesTransferred = e.BytesTransferred; - _log.LogDebug("{total} total bytes transferred with {available} bytes remaining", totalBytesTransferred, + _log.LogDebug("{Total} total bytes transferred with {Available} bytes remaining", totalBytesTransferred, sock.Available); // we still have available data so the payload was segmented while (sock.Available > 0) { - _log.LogDebug("{available} more bytes to be read", sock.Available); + _log.LogDebug("{Available} more bytes to be read", sock.Available); var bufferSpaceAvailable = sock.Available + totalBytesTransferred - state.ReceiveBuffer.Length; if (bufferSpaceAvailable >= 0) { _log.LogWarning( - "Not enough buffer space to store incoming data {bytesNeeded} additional bytes required", + "Not enough buffer space to store incoming data {BytesNeeded} additional bytes required", bufferSpaceAvailable); continue; } @@ -604,8 +626,8 @@ namespace Integrations.Cod continue; } - _log.LogDebug("Read {bytesTransferred} synchronous bytes from {endpoint}", - state.ReceiveEventArgs.BytesTransferred, e.RemoteEndPoint); + _log.LogDebug("Read {BytesTransferred} synchronous bytes from {Endpoint}", + state.ReceiveEventArgs.BytesTransferred, e.RemoteEndPoint?.ToString()); // we need to increment this here because the callback isn't executed if there's no pending IO state.BytesReadPerSegment.Add(state.ReceiveEventArgs.BytesTransferred); totalBytesTransferred += state.ReceiveEventArgs.BytesTransferred; @@ -619,22 +641,38 @@ namespace Integrations.Cod finally { - var semaphore = ActiveQueries[this.Endpoint].OnReceivedData; - if (semaphore.CurrentCount == 0) + var semaphore = ActiveQueries[Endpoint].OnReceivedData; + try { - semaphore.Release(); + if (semaphore.CurrentCount == 0) + { + semaphore.Release(); + } + } + catch + { + // ignored because we can have the socket operation cancelled (which releases the semaphore) but + // this thread is not notified because it's an event } } } private void OnDataSent(object sender, SocketAsyncEventArgs e) { - _log.LogDebug("Sent {byteCount} bytes to {endpoint}", e.Buffer?.Length, e.ConnectSocket?.RemoteEndPoint); + _log.LogDebug("Sent {ByteCount} bytes to {Endpoint}", e.Buffer?.Length, e.ConnectSocket?.RemoteEndPoint?.ToString()); - var semaphore = ActiveQueries[this.Endpoint].OnSentData; - if (semaphore.CurrentCount == 0) + var semaphore = ActiveQueries[Endpoint].OnSentData; + try { - semaphore.Release(); + if (semaphore.CurrentCount == 0) + { + semaphore.Release(); + } + } + catch + { + // ignored because we can have the socket operation cancelled (which releases the semaphore) but + // this thread is not notified because it's an event } } }