IW4M-Admin/Application/CoreEventHandler.cs
2023-04-05 09:54:57 -05:00

146 lines
5.4 KiB
C#

using System;
using System.Collections.Concurrent;
using SharedLibraryCore;
using SharedLibraryCore.Events;
using SharedLibraryCore.Interfaces;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using SharedLibraryCore.Events.Management;
using SharedLibraryCore.Events.Server;
using SharedLibraryCore.Interfaces.Events;
using ILogger = Microsoft.Extensions.Logging.ILogger;
namespace IW4MAdmin.Application
{
public class CoreEventHandler : ICoreEventHandler
{
private const int MaxCurrentEvents = 25;
private readonly ILogger _logger;
private readonly SemaphoreSlim _onProcessingEvents = new(MaxCurrentEvents, MaxCurrentEvents);
private readonly ManualResetEventSlim _onEventReady = new(false);
private readonly ConcurrentQueue<(IManager, CoreEvent)> _runningEventTasks = new();
private CancellationToken _cancellationToken;
private int _activeTasks;
private static readonly GameEvent.EventType[] OverrideEvents =
{
GameEvent.EventType.Connect,
GameEvent.EventType.Disconnect,
GameEvent.EventType.Quit,
GameEvent.EventType.Stop
};
public CoreEventHandler(ILogger<CoreEventHandler> logger)
{
_logger = logger;
}
public void QueueEvent(IManager manager, CoreEvent coreEvent)
{
_runningEventTasks.Enqueue((manager, coreEvent));
_onEventReady.Set();
}
public void StartProcessing(CancellationToken token)
{
_cancellationToken = token;
while (!_cancellationToken.IsCancellationRequested)
{
_onEventReady.Reset();
try
{
_onProcessingEvents.Wait(_cancellationToken);
if (!_runningEventTasks.TryDequeue(out var coreEvent))
{
if (_onProcessingEvents.CurrentCount < MaxCurrentEvents)
{
_onProcessingEvents.Release(1);
}
_onEventReady.Wait(_cancellationToken);
continue;
}
_logger.LogDebug("Start processing event {Name} {SemaphoreCount} - {QueuedTasks}",
coreEvent.Item2.GetType().Name, _onProcessingEvents.CurrentCount, _runningEventTasks.Count);
_ = Task.Factory.StartNew(() =>
{
Interlocked.Increment(ref _activeTasks);
_logger.LogDebug("[Start] Active Tasks = {TaskCount}", _activeTasks);
return HandleEventTaskExecute(coreEvent);
});
}
catch (OperationCanceledException)
{
// ignored
}
catch (Exception ex)
{
_logger.LogError(ex, "Could not enqueue event for processing");
}
}
}
private async Task HandleEventTaskExecute((IManager, CoreEvent) coreEvent)
{
try
{
await GetEventTask(coreEvent.Item1, coreEvent.Item2);
}
catch (OperationCanceledException)
{
_logger.LogWarning("Event timed out {Type}", coreEvent.Item2.GetType().Name);
}
catch (Exception ex)
{
_logger.LogError(ex, "Could not complete invoke for {EventType}",
coreEvent.Item2.GetType().Name);
}
finally
{
if (_onProcessingEvents.CurrentCount < MaxCurrentEvents)
{
_logger.LogDebug("Freeing up event semaphore for next event {SemaphoreCount}",
_onProcessingEvents.CurrentCount);
_onProcessingEvents.Release(1);
}
Interlocked.Decrement(ref _activeTasks);
_logger.LogDebug("[Complete] {Type}, Active Tasks = {TaskCount} - {Queue}", coreEvent.Item2.GetType(),
_activeTasks, _runningEventTasks.Count);
}
}
private Task GetEventTask(IManager manager, CoreEvent coreEvent)
{
return coreEvent switch
{
GameEvent gameEvent => BuildLegacyEventTask(manager, coreEvent, gameEvent),
GameServerEvent gameServerEvent => IGameServerEventSubscriptions.InvokeEventAsync(gameServerEvent,
manager.CancellationToken),
ManagementEvent managementEvent => IManagementEventSubscriptions.InvokeEventAsync(managementEvent,
manager.CancellationToken),
_ => Task.CompletedTask
};
}
private async Task BuildLegacyEventTask(IManager manager, CoreEvent coreEvent, GameEvent gameEvent)
{
if (manager.IsRunning || OverrideEvents.Contains(gameEvent.Type))
{
await manager.ExecuteEvent(gameEvent);
await IGameEventSubscriptions.InvokeEventAsync(coreEvent, manager.CancellationToken);
return;
}
_logger.LogDebug("Skipping event as we're shutting down {EventId}", gameEvent.IncrementalId);
}
}
}