small optimizations

This commit is contained in:
RaidMax 2022-02-02 16:21:08 -06:00
parent b7a76cc4a2
commit a0f4ceccfe
3 changed files with 72 additions and 32 deletions

View File

@ -150,9 +150,9 @@ namespace Integrations.Cod
}) })
{ {
connectionState.SendEventArgs.UserToken = socket; connectionState.SendEventArgs.UserToken = socket;
connectionState.OnSentData.Reset();
connectionState.OnReceivedData.Reset();
connectionState.ConnectionAttempts++; connectionState.ConnectionAttempts++;
await connectionState.OnSentData.WaitAsync();
await connectionState.OnReceivedData.WaitAsync();
connectionState.BytesReadPerSegment.Clear(); connectionState.BytesReadPerSegment.Clear();
bool exceptionCaught = false; bool exceptionCaught = false;
@ -199,6 +199,16 @@ namespace Integrations.Cod
{ {
connectionState.OnComplete.Release(1); connectionState.OnComplete.Release(1);
} }
if (connectionState.OnSentData.CurrentCount == 0)
{
connectionState.OnSentData.Release();
}
if (connectionState.OnReceivedData.CurrentCount == 0)
{
connectionState.OnReceivedData.Release();
}
} }
} }
@ -328,7 +338,8 @@ namespace Integrations.Cod
{ {
// the send has not been completed asynchronously // the send has not been completed asynchronously
// this really shouldn't ever happen because it's UDP // this really shouldn't ever happen because it's UDP
if (!await Task.Run(() => connectionState.OnSentData.Wait(StaticHelpers.SocketTimeout(1))))
if(!await connectionState.OnSentData.WaitAsync(StaticHelpers.SocketTimeout(1)))
{ {
using(LogContext.PushProperty("Server", Endpoint.ToString())) using(LogContext.PushProperty("Server", Endpoint.ToString()))
{ {
@ -342,7 +353,7 @@ namespace Integrations.Cod
if (!waitForResponse) if (!waitForResponse)
{ {
return new byte[0][]; return Array.Empty<byte[]>();
} }
connectionState.ReceiveEventArgs.SetBuffer(connectionState.ReceiveBuffer); connectionState.ReceiveEventArgs.SetBuffer(connectionState.ReceiveBuffer);
@ -353,12 +364,12 @@ namespace Integrations.Cod
if (receiveDataPending) if (receiveDataPending)
{ {
_log.LogDebug("Waiting to asynchronously receive data on attempt #{connectionAttempts}", connectionState.ConnectionAttempts); _log.LogDebug("Waiting to asynchronously receive data on attempt #{connectionAttempts}", connectionState.ConnectionAttempts);
if (!await Task.Run(() => connectionState.OnReceivedData.Wait( if (!await connectionState.OnReceivedData.WaitAsync(
new[] new[]
{ {
StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts), StaticHelpers.SocketTimeout(connectionState.ConnectionAttempts),
overrideTimeout overrideTimeout
}.Max()))) }.Max()))
{ {
if (connectionState.ConnectionAttempts > 1) // this reduces some spam for unstable connections if (connectionState.ConnectionAttempts > 1) // this reduces some spam for unstable connections
{ {
@ -407,12 +418,24 @@ namespace Integrations.Cod
if (e.BytesTransferred == 0) if (e.BytesTransferred == 0)
{ {
_log.LogDebug("No bytes were transmitted so the connection was probably closed"); _log.LogDebug("No bytes were transmitted so the connection was probably closed");
ActiveQueries[this.Endpoint].OnReceivedData.Set();
var semaphore = ActiveQueries[this.Endpoint].OnReceivedData;
if (semaphore.CurrentCount == 0)
{
semaphore.Release();
}
return; return;
} }
if (!(sender is Socket sock)) if (sender is not Socket sock)
{ {
var semaphore = ActiveQueries[this.Endpoint].OnReceivedData;
if (semaphore.CurrentCount == 0)
{
semaphore.Release();
}
return; return;
} }
@ -427,16 +450,19 @@ namespace Integrations.Cod
try try
{ {
var totalBytesTransferred = e.BytesTransferred; var totalBytesTransferred = e.BytesTransferred;
_log.LogDebug("{total} total bytes transferred with {available} bytes remaining", totalBytesTransferred, sock.Available); _log.LogDebug("{total} total bytes transferred with {available} bytes remaining", totalBytesTransferred,
sock.Available);
// we still have available data so the payload was segmented // we still have available data so the payload was segmented
while (sock.Available > 0) while (sock.Available > 0)
{ {
_log.LogDebug("{available} more bytes to be read", sock.Available); _log.LogDebug("{available} more bytes to be read", sock.Available);
var bufferSpaceAvailable = sock.Available + totalBytesTransferred - state.ReceiveBuffer.Length; var bufferSpaceAvailable = sock.Available + totalBytesTransferred - state.ReceiveBuffer.Length;
if (bufferSpaceAvailable >= 0 ) if (bufferSpaceAvailable >= 0)
{ {
_log.LogWarning("Not enough buffer space to store incoming data {bytesNeeded} additional bytes required", bufferSpaceAvailable); _log.LogWarning(
"Not enough buffer space to store incoming data {bytesNeeded} additional bytes required",
bufferSpaceAvailable);
continue; continue;
} }
@ -447,27 +473,39 @@ namespace Integrations.Cod
_log.LogDebug("Remaining bytes are async"); _log.LogDebug("Remaining bytes are async");
continue; continue;
} }
_log.LogDebug("Read {bytesTransferred} synchronous bytes from {endpoint}", state.ReceiveEventArgs.BytesTransferred, e.RemoteEndPoint); _log.LogDebug("Read {bytesTransferred} synchronous bytes from {endpoint}",
state.ReceiveEventArgs.BytesTransferred, e.RemoteEndPoint);
// we need to increment this here because the callback isn't executed if there's no pending IO // we need to increment this here because the callback isn't executed if there's no pending IO
state.BytesReadPerSegment.Add(state.ReceiveEventArgs.BytesTransferred); state.BytesReadPerSegment.Add(state.ReceiveEventArgs.BytesTransferred);
totalBytesTransferred += state.ReceiveEventArgs.BytesTransferred; totalBytesTransferred += state.ReceiveEventArgs.BytesTransferred;
} }
ActiveQueries[this.Endpoint].OnReceivedData.Set();
} }
catch (ObjectDisposedException) catch (ObjectDisposedException)
{ {
_log.LogDebug("Socket was disposed while receiving data"); _log.LogDebug("Socket was disposed while receiving data");
ActiveQueries[this.Endpoint].OnReceivedData.Set(); }
finally
{
var semaphore = ActiveQueries[this.Endpoint].OnReceivedData;
if (semaphore.CurrentCount == 0)
{
semaphore.Release();
}
} }
} }
private void OnDataSent(object sender, SocketAsyncEventArgs e) private void OnDataSent(object sender, SocketAsyncEventArgs e)
{ {
_log.LogDebug("Sent {byteCount} bytes to {endpoint}", e.Buffer?.Length, e.ConnectSocket?.RemoteEndPoint); _log.LogDebug("Sent {byteCount} bytes to {endpoint}", e.Buffer?.Length, e.ConnectSocket?.RemoteEndPoint);
ActiveQueries[this.Endpoint].OnSentData.Set();
var semaphore = ActiveQueries[this.Endpoint].OnSentData;
if (semaphore.CurrentCount == 0)
{
semaphore.Release();
}
} }
} }
} }

View File

@ -21,11 +21,12 @@ namespace Integrations.Cod
private const int BufferSize = 16384; private const int BufferSize = 16384;
public readonly byte[] ReceiveBuffer = new byte[BufferSize]; public readonly byte[] ReceiveBuffer = new byte[BufferSize];
public readonly SemaphoreSlim OnComplete = new SemaphoreSlim(1, 1); public readonly SemaphoreSlim OnComplete = new SemaphoreSlim(1, 1);
public readonly ManualResetEventSlim OnSentData = new ManualResetEventSlim(false); public readonly SemaphoreSlim OnSentData = new(1, 1);
public readonly ManualResetEventSlim OnReceivedData = new ManualResetEventSlim(false); public readonly SemaphoreSlim OnReceivedData = new (1, 1);
public List<int> BytesReadPerSegment { get; set; } = new List<int>(); public List<int> BytesReadPerSegment { get; set; } = new List<int>();
public SocketAsyncEventArgs SendEventArgs { get; set; } = new SocketAsyncEventArgs(); public SocketAsyncEventArgs SendEventArgs { get; set; } = new SocketAsyncEventArgs();
public SocketAsyncEventArgs ReceiveEventArgs { get; set; } = new SocketAsyncEventArgs(); public SocketAsyncEventArgs ReceiveEventArgs { get; set; } = new SocketAsyncEventArgs();
public DateTime LastQuery { get; set; } = DateTime.Now; public DateTime LastQuery { get; set; } = DateTime.Now;
} }
} }

View File

@ -249,7 +249,7 @@ namespace SharedLibraryCore
} }
private static long NextEventId; private static long NextEventId;
private readonly ManualResetEvent _eventFinishedWaiter; private readonly SemaphoreSlim _eventFinishedWaiter = new(0, 1);
public string Data; // Data is usually the message sent by player public string Data; // Data is usually the message sent by player
public string Message; public string Message;
public EFClient Origin; public EFClient Origin;
@ -260,10 +260,14 @@ namespace SharedLibraryCore
public GameEvent() public GameEvent()
{ {
_eventFinishedWaiter = new ManualResetEvent(false);
Time = DateTime.UtcNow; Time = DateTime.UtcNow;
Id = GetNextEventId(); Id = GetNextEventId();
} }
~GameEvent()
{
_eventFinishedWaiter.Dispose();
}
public EventSource Source { get; set; } public EventSource Source { get; set; }
@ -299,15 +303,12 @@ namespace SharedLibraryCore
return Interlocked.Increment(ref NextEventId); return Interlocked.Increment(ref NextEventId);
} }
~GameEvent()
{
_eventFinishedWaiter.Set();
_eventFinishedWaiter.Dispose();
}
public void Complete() public void Complete()
{ {
_eventFinishedWaiter.Set(); if (_eventFinishedWaiter.CurrentCount == 0)
{
_eventFinishedWaiter.Release();
}
} }
public async Task<GameEvent> WaitAsync() public async Task<GameEvent> WaitAsync()
@ -325,7 +326,7 @@ namespace SharedLibraryCore
Utilities.DefaultLogger.LogDebug("Begin wait for event {Id}", Id); Utilities.DefaultLogger.LogDebug("Begin wait for event {Id}", Id);
try try
{ {
processed = await Task.Run(() => _eventFinishedWaiter.WaitOne(timeSpan), token); processed = await _eventFinishedWaiter.WaitAsync(timeSpan, token);
} }
catch (TaskCanceledException) catch (TaskCanceledException)
@ -347,4 +348,4 @@ namespace SharedLibraryCore
return this; return this;
} }
} }
} }