diff --git a/Integrations/Cod/CodRConConnection.cs b/Integrations/Cod/CodRConConnection.cs index a93b58488..6282e4eda 100644 --- a/Integrations/Cod/CodRConConnection.cs +++ b/Integrations/Cod/CodRConConnection.cs @@ -150,9 +150,9 @@ namespace Integrations.Cod }) { connectionState.SendEventArgs.UserToken = socket; - connectionState.OnSentData.Reset(); - connectionState.OnReceivedData.Reset(); connectionState.ConnectionAttempts++; + await connectionState.OnSentData.WaitAsync(); + await connectionState.OnReceivedData.WaitAsync(); connectionState.BytesReadPerSegment.Clear(); bool exceptionCaught = false; @@ -199,6 +199,16 @@ namespace Integrations.Cod { connectionState.OnComplete.Release(1); } + + if (connectionState.OnSentData.CurrentCount == 0) + { + connectionState.OnSentData.Release(); + } + + if (connectionState.OnReceivedData.CurrentCount == 0) + { + connectionState.OnReceivedData.Release(); + } } } @@ -328,7 +338,8 @@ namespace Integrations.Cod { // the send has not been completed asynchronously // this really shouldn't ever happen because it's UDP - if (!await Task.Run(() => connectionState.OnSentData.Wait(StaticHelpers.SocketTimeout(1)))) + + if(!await connectionState.OnSentData.WaitAsync(StaticHelpers.SocketTimeout(1))) { using(LogContext.PushProperty("Server", Endpoint.ToString())) { @@ -342,7 +353,7 @@ namespace Integrations.Cod if (!waitForResponse) { - return new byte[0][]; + return Array.Empty(); } connectionState.ReceiveEventArgs.SetBuffer(connectionState.ReceiveBuffer); @@ -353,12 +364,12 @@ namespace Integrations.Cod if (receiveDataPending) { _log.LogDebug("Waiting to asynchronously receive data on attempt #{connectionAttempts}", connectionState.ConnectionAttempts); - if (!await Task.Run(() => connectionState.OnReceivedData.Wait( + if (!await connectionState.OnReceivedData.WaitAsync( new[] { StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts), overrideTimeout - }.Max()))) + }.Max())) { if (connectionState.ConnectionAttempts > 1) // this reduces some spam for unstable connections { @@ -407,12 +418,24 @@ namespace Integrations.Cod if (e.BytesTransferred == 0) { _log.LogDebug("No bytes were transmitted so the connection was probably closed"); - ActiveQueries[this.Endpoint].OnReceivedData.Set(); + + var semaphore = ActiveQueries[this.Endpoint].OnReceivedData; + if (semaphore.CurrentCount == 0) + { + semaphore.Release(); + } + return; } - if (!(sender is Socket sock)) + if (sender is not Socket sock) { + var semaphore = ActiveQueries[this.Endpoint].OnReceivedData; + if (semaphore.CurrentCount == 0) + { + semaphore.Release(); + } + return; } @@ -427,16 +450,19 @@ namespace Integrations.Cod try { var totalBytesTransferred = e.BytesTransferred; - _log.LogDebug("{total} total bytes transferred with {available} bytes remaining", totalBytesTransferred, sock.Available); + _log.LogDebug("{total} total bytes transferred with {available} bytes remaining", totalBytesTransferred, + sock.Available); // we still have available data so the payload was segmented while (sock.Available > 0) { _log.LogDebug("{available} more bytes to be read", sock.Available); var bufferSpaceAvailable = sock.Available + totalBytesTransferred - state.ReceiveBuffer.Length; - if (bufferSpaceAvailable >= 0 ) + if (bufferSpaceAvailable >= 0) { - _log.LogWarning("Not enough buffer space to store incoming data {bytesNeeded} additional bytes required", bufferSpaceAvailable); + _log.LogWarning( + "Not enough buffer space to store incoming data {bytesNeeded} additional bytes required", + bufferSpaceAvailable); continue; } @@ -447,27 +473,39 @@ namespace Integrations.Cod _log.LogDebug("Remaining bytes are async"); continue; } - - _log.LogDebug("Read {bytesTransferred} synchronous bytes from {endpoint}", state.ReceiveEventArgs.BytesTransferred, e.RemoteEndPoint); + + _log.LogDebug("Read {bytesTransferred} synchronous bytes from {endpoint}", + state.ReceiveEventArgs.BytesTransferred, e.RemoteEndPoint); // we need to increment this here because the callback isn't executed if there's no pending IO state.BytesReadPerSegment.Add(state.ReceiveEventArgs.BytesTransferred); totalBytesTransferred += state.ReceiveEventArgs.BytesTransferred; } - - ActiveQueries[this.Endpoint].OnReceivedData.Set(); } catch (ObjectDisposedException) { _log.LogDebug("Socket was disposed while receiving data"); - ActiveQueries[this.Endpoint].OnReceivedData.Set(); + } + + finally + { + var semaphore = ActiveQueries[this.Endpoint].OnReceivedData; + if (semaphore.CurrentCount == 0) + { + semaphore.Release(); + } } } private void OnDataSent(object sender, SocketAsyncEventArgs e) { _log.LogDebug("Sent {byteCount} bytes to {endpoint}", e.Buffer?.Length, e.ConnectSocket?.RemoteEndPoint); - ActiveQueries[this.Endpoint].OnSentData.Set(); + + var semaphore = ActiveQueries[this.Endpoint].OnSentData; + if (semaphore.CurrentCount == 0) + { + semaphore.Release(); + } } } -} \ No newline at end of file +} diff --git a/Integrations/Cod/ConnectionState.cs b/Integrations/Cod/ConnectionState.cs index 129111aa6..243511ea5 100644 --- a/Integrations/Cod/ConnectionState.cs +++ b/Integrations/Cod/ConnectionState.cs @@ -21,11 +21,12 @@ namespace Integrations.Cod private const int BufferSize = 16384; public readonly byte[] ReceiveBuffer = new byte[BufferSize]; public readonly SemaphoreSlim OnComplete = new SemaphoreSlim(1, 1); - public readonly ManualResetEventSlim OnSentData = new ManualResetEventSlim(false); - public readonly ManualResetEventSlim OnReceivedData = new ManualResetEventSlim(false); + public readonly SemaphoreSlim OnSentData = new(1, 1); + public readonly SemaphoreSlim OnReceivedData = new (1, 1); + public List BytesReadPerSegment { get; set; } = new List(); public SocketAsyncEventArgs SendEventArgs { get; set; } = new SocketAsyncEventArgs(); public SocketAsyncEventArgs ReceiveEventArgs { get; set; } = new SocketAsyncEventArgs(); public DateTime LastQuery { get; set; } = DateTime.Now; } -} \ No newline at end of file +} diff --git a/SharedLibraryCore/Events/GameEvent.cs b/SharedLibraryCore/Events/GameEvent.cs index c3e48db3c..4b6f1f846 100644 --- a/SharedLibraryCore/Events/GameEvent.cs +++ b/SharedLibraryCore/Events/GameEvent.cs @@ -249,7 +249,7 @@ namespace SharedLibraryCore } private static long NextEventId; - private readonly ManualResetEvent _eventFinishedWaiter; + private readonly SemaphoreSlim _eventFinishedWaiter = new(0, 1); public string Data; // Data is usually the message sent by player public string Message; public EFClient Origin; @@ -260,10 +260,14 @@ namespace SharedLibraryCore public GameEvent() { - _eventFinishedWaiter = new ManualResetEvent(false); Time = DateTime.UtcNow; Id = GetNextEventId(); } + + ~GameEvent() + { + _eventFinishedWaiter.Dispose(); + } public EventSource Source { get; set; } @@ -299,15 +303,12 @@ namespace SharedLibraryCore return Interlocked.Increment(ref NextEventId); } - ~GameEvent() - { - _eventFinishedWaiter.Set(); - _eventFinishedWaiter.Dispose(); - } - public void Complete() { - _eventFinishedWaiter.Set(); + if (_eventFinishedWaiter.CurrentCount == 0) + { + _eventFinishedWaiter.Release(); + } } public async Task WaitAsync() @@ -325,7 +326,7 @@ namespace SharedLibraryCore Utilities.DefaultLogger.LogDebug("Begin wait for event {Id}", Id); try { - processed = await Task.Run(() => _eventFinishedWaiter.WaitOne(timeSpan), token); + processed = await _eventFinishedWaiter.WaitAsync(timeSpan, token); } catch (TaskCanceledException) @@ -347,4 +348,4 @@ namespace SharedLibraryCore return this; } } -} \ No newline at end of file +}