From cbd0eaa353d82418f86d2f30291cf4ee55e7e280 Mon Sep 17 00:00:00 2001 From: Young Date: Sun, 20 Oct 2024 20:34:14 +0800 Subject: [PATCH] updated event bus --- .../IDynamicIntegrationEventHandler.cs | 6 - src/Infrastructure.EventBus/IEventBus.cs | 8 +- .../IIntegrationEventHandler.cs | 10 +- .../Infrastructure.EventBus.csproj | 4 + ...Connection.cs => IPersistentConnection.cs} | 3 +- .../RabbitMQ/RabbitMQEventBus.cs | 308 ++++++++++++++++++ .../RabbitMQ/RabbitMQPersistentConnection.cs | 129 ++++---- .../RabbitMQ/RabbitMqEventBus.cs | 38 --- .../IEventBusSubscriptionManager.cs | 28 ++ .../InMemoryEventBusSubscriptionManager.cs | 109 +++++++ .../Subscriptions/Subscription.cs | 13 + 11 files changed, 539 insertions(+), 117 deletions(-) delete mode 100644 src/Infrastructure.EventBus/IDynamicIntegrationEventHandler.cs rename src/Infrastructure.EventBus/RabbitMQ/{IRabbitMQPersistentConnection.cs => IPersistentConnection.cs} (69%) create mode 100644 src/Infrastructure.EventBus/RabbitMQ/RabbitMQEventBus.cs delete mode 100644 src/Infrastructure.EventBus/RabbitMQ/RabbitMqEventBus.cs create mode 100644 src/Infrastructure.EventBus/Subscriptions/IEventBusSubscriptionManager.cs create mode 100644 src/Infrastructure.EventBus/Subscriptions/InMemoryEventBusSubscriptionManager.cs create mode 100644 src/Infrastructure.EventBus/Subscriptions/Subscription.cs diff --git a/src/Infrastructure.EventBus/IDynamicIntegrationEventHandler.cs b/src/Infrastructure.EventBus/IDynamicIntegrationEventHandler.cs deleted file mode 100644 index 260fc2f..0000000 --- a/src/Infrastructure.EventBus/IDynamicIntegrationEventHandler.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace Infrastructure.EventBus; - -public interface IDynamicIntegrationEventHandler -{ - Task Handle(dynamic eventData); -} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/IEventBus.cs b/src/Infrastructure.EventBus/IEventBus.cs index 0d9d013..3b0bbce 100644 --- a/src/Infrastructure.EventBus/IEventBus.cs +++ b/src/Infrastructure.EventBus/IEventBus.cs @@ -4,11 +4,7 @@ public interface IEventBus { void Publish(IntegrationEvent integrationEvent); - void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler; + void Subscribe() where TEvent : IntegrationEvent where TEventHandler : IIntegrationEventHandler; - void Unsubscribe() where TH : IIntegrationEventHandler where T : IntegrationEvent; - - void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler; - - void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler; + void Unsubscribe() where TEventHandler : IIntegrationEventHandler where TEvent : IntegrationEvent; } \ No newline at end of file diff --git a/src/Infrastructure.EventBus/IIntegrationEventHandler.cs b/src/Infrastructure.EventBus/IIntegrationEventHandler.cs index ebf8228..5bef033 100644 --- a/src/Infrastructure.EventBus/IIntegrationEventHandler.cs +++ b/src/Infrastructure.EventBus/IIntegrationEventHandler.cs @@ -1,11 +1,7 @@ namespace Infrastructure.EventBus; -public interface IIntegrationEventHandler -{ -} - -public interface IIntegrationEventHandler : IIntegrationEventHandler +public interface IIntegrationEventHandler where TIntegrationEvent : IntegrationEvent { - Task Handle(TIntegrationEvent integrationEvent); -} + Task HandleAsync(TIntegrationEvent integrationEvent); +} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/Infrastructure.EventBus.csproj b/src/Infrastructure.EventBus/Infrastructure.EventBus.csproj index 06a3495..76221e9 100644 --- a/src/Infrastructure.EventBus/Infrastructure.EventBus.csproj +++ b/src/Infrastructure.EventBus/Infrastructure.EventBus.csproj @@ -16,4 +16,8 @@ + + + + diff --git a/src/Infrastructure.EventBus/RabbitMQ/IRabbitMQPersistentConnection.cs b/src/Infrastructure.EventBus/RabbitMQ/IPersistentConnection.cs similarity index 69% rename from src/Infrastructure.EventBus/RabbitMQ/IRabbitMQPersistentConnection.cs rename to src/Infrastructure.EventBus/RabbitMQ/IPersistentConnection.cs index 50e7b7e..1e02a51 100644 --- a/src/Infrastructure.EventBus/RabbitMQ/IRabbitMQPersistentConnection.cs +++ b/src/Infrastructure.EventBus/RabbitMQ/IPersistentConnection.cs @@ -1,7 +1,8 @@ namespace Infrastructure.EventBus.RabbitMQ; -public interface IRabbitMQPersistentConnection : IDisposable +public interface IPersistentConnection : IDisposable { + event EventHandler OnReconnectedAfterConnectionFailure; bool IsConnected { get; } bool TryConnect(); diff --git a/src/Infrastructure.EventBus/RabbitMQ/RabbitMQEventBus.cs b/src/Infrastructure.EventBus/RabbitMQ/RabbitMQEventBus.cs new file mode 100644 index 0000000..6ea6706 --- /dev/null +++ b/src/Infrastructure.EventBus/RabbitMQ/RabbitMQEventBus.cs @@ -0,0 +1,308 @@ +using System.Net.Sockets; +using System.Reflection; +using System.Text; +using Infrastructure.EventBus.Subscriptions; +using Infrastructure.Utils; +using Polly; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; + +namespace Infrastructure.EventBus.RabbitMQ; + +public class RabbitMQEventBus : IEventBus +{ + private readonly IPersistentConnection _persistentConnection; + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + private readonly IEventBusSubscriptionManager _eventBusSubscriptionManager; + private readonly string _exchangeName; + private readonly string _queueName; + + public RabbitMQEventBus(IPersistentConnection persistentConnection, + ILogger logger, + IServiceProvider serviceProvider, + IEventBusSubscriptionManager eventBusSubscriptionManager, + string exchangeName, + string queueName) + { + _persistentConnection = persistentConnection; + _logger = logger; + _serviceProvider = serviceProvider; + _eventBusSubscriptionManager = eventBusSubscriptionManager; + _exchangeName = exchangeName; + _queueName = queueName; + ConfigureMessageBroker(); + } + + private readonly int _publishRetryCount = 5; + private IModel _consumerChannel; + private readonly TimeSpan _subscribeRetryTime = TimeSpan.FromSeconds(5); + + public void Publish(IntegrationEvent integrationEvent) + { + if (_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } + + var policy = Policy + .Handle() + .Or() + .WaitAndRetry(_publishRetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + (exception, timeSpan) => + { + _logger.LogWarning(exception, + "Could not publish event #{EventId} after {Timeout} seconds: {ExceptionMessage}.", + integrationEvent.Id, + $"{timeSpan.TotalSeconds:n1}", exception.Message); + }); + var eventName = integrationEvent.GetType().Name; + _logger.LogTrace("Creating RabbitMQ channel to publish event #{EventId} ({EventName})...", integrationEvent.Id, + eventName); + using var channel = _persistentConnection.CreateModel(); + _logger.LogTrace("Declaring RabbitMQ exchange to publish event #{EventId}...", integrationEvent.Id); + channel.ExchangeDeclare(exchange: _exchangeName, type: "direct"); + + var message = integrationEvent.Serialize(); + var body = Encoding.UTF8.GetBytes(message); + policy.Execute(() => + { + var properties = channel.CreateBasicProperties(); + properties.DeliveryMode = 2; + _logger.LogTrace("Publishing event to RabbitMQ with ID #{EventId}...", integrationEvent.Id); + channel.BasicPublish( + exchange: _exchangeName, + routingKey: eventName, + mandatory: true, + basicProperties: properties, + body: body); + _logger.LogTrace("Published event with ID #{EventId}.", integrationEvent.Id); + }); + } + + public void Subscribe() where TEvent : IntegrationEvent + where TEventHandler : IIntegrationEventHandler + { + var eventName = _eventBusSubscriptionManager.GetEventIdentifier(); + var eventHandlerName = typeof(TEventHandler).Name; + AddQueueBindForEventSubscription(eventName); + _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}...", eventName, eventHandlerName); + _eventBusSubscriptionManager.AddSubscription(); + _logger.LogInformation("Subscribed to event {EventName} with {EvenHandler}.", eventName, eventHandlerName); + } + + public void Unsubscribe() where TEvent : IntegrationEvent + where TEventHandler : IIntegrationEventHandler + { + var eventName = _eventBusSubscriptionManager.GetEventIdentifier(); + _logger.LogInformation("Unsubscribing from event {EventName}...", eventName); + + _eventBusSubscriptionManager.RemoveSubscription(); + + _logger.LogInformation("Unsubscribed from event {EventName}.", eventName); + } + + + private void ConfigureMessageBroker() + { + _consumerChannel = CreateConsumerChannel(); + _eventBusSubscriptionManager.OnEventRemoved += SubscriptionManager_OnEventRemoved; + _persistentConnection.OnReconnectedAfterConnectionFailure += + PersistentConnection_OnReconnectedAfterConnectionFailure; + } + + private void PersistentConnection_OnReconnectedAfterConnectionFailure(object sender, EventArgs e) + { + DoCreateConsumerChannel(); + RecreateSubscriptions(); + } + + private void RecreateSubscriptions() + { + var subscriptions = _eventBusSubscriptionManager.GetAllSubscriptions(); + _eventBusSubscriptionManager.Clear(); + + var eventBusType = this.GetType(); + + foreach (var entry in subscriptions) + { + foreach (var genericSubscribe in entry.Value.Select(subscription => eventBusType.GetMethod("Subscribe") + .MakeGenericMethod(subscription.EventType, subscription.HandlerType))) + { + genericSubscribe.Invoke(this, null); + } + } + } + + private void DoCreateConsumerChannel() + { + _consumerChannel.Dispose(); + _consumerChannel = CreateConsumerChannel(); + StartBasicConsume(); + } + + private void StartBasicConsume() + { + _logger.LogTrace("Starting RabbitMQ basic consume..."); + + if (_consumerChannel == null) + { + _logger.LogError("Could not start basic consume because consumer channel is null."); + return; + } + + var consumer = new AsyncEventingBasicConsumer(_consumerChannel); + consumer.Received += Consumer_Received; + + _consumerChannel.BasicConsume + ( + queue: _queueName, + autoAck: false, + consumer: consumer + ); + + _logger.LogTrace("Started RabbitMQ basic consume."); + } + + private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs) + { + var eventName = eventArgs.RoutingKey; + var message = Encoding.UTF8.GetString(eventArgs.Body.Span); + + var isAcknowledged = false; + + try + { + await ProcessEvent(eventName, message); + + _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false); + isAcknowledged = true; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error processing the following message: {Message}.", message); + } + finally + { + if (!isAcknowledged) + { + await TryEnqueueMessageAgainAsync(eventArgs); + } + } + } + + + private async Task TryEnqueueMessageAgainAsync(BasicDeliverEventArgs eventArgs) + { + try + { + _logger.LogWarning("Adding message to queue again with {Time} seconds delay...", + $"{_subscribeRetryTime.TotalSeconds:n1}"); + + await Task.Delay(_subscribeRetryTime); + _consumerChannel.BasicNack(eventArgs.DeliveryTag, false, true); + + _logger.LogTrace("Message added to queue again."); + } + catch (Exception ex) + { + _logger.LogError("Could not enqueue message again: {Error}.", ex.Message); + } + } + + private async Task ProcessEvent(string eventName, string message) + { + _logger.LogTrace("Processing RabbitMQ event: {EventName}...", eventName); + + if (!_eventBusSubscriptionManager.HasSubscriptionsForEvent(eventName)) + { + _logger.LogTrace("There are no subscriptions for this event."); + return; + } + + var subscriptions = _eventBusSubscriptionManager.GetHandlersForEvent(eventName); + foreach (var subscription in subscriptions) + { + var handler = _serviceProvider.GetService(subscription.HandlerType); + if (handler == null) + { + _logger.LogWarning("There are no handlers for the following event: {EventName}", eventName); + continue; + } + + var eventType = _eventBusSubscriptionManager.GetEventTypeByName(eventName); + + var @event = JsonConvert.DeserializeObject(message, eventType); + var eventHandlerType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); + await Task.Yield(); + await (Task)eventHandlerType.GetMethod(nameof(IIntegrationEventHandler.HandleAsync)) + .Invoke(handler, [@event]); + } + + _logger.LogTrace("Processed event {EventName}.", eventName); + } + + private void SubscriptionManager_OnEventRemoved(object sender, string eventName) + { + if (!_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } + + using var channel = _persistentConnection.CreateModel(); + channel.QueueUnbind(queue: _queueName, exchange: _exchangeName, routingKey: eventName); + + if (_eventBusSubscriptionManager.IsEmpty) + { + _consumerChannel.Close(); + } + } + + private IModel CreateConsumerChannel() + { + if (!_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } + + _logger.LogTrace("Creating RabbitMQ consumer channel..."); + + var channel = _persistentConnection.CreateModel(); + + channel.ExchangeDeclare(exchange: _exchangeName, type: "direct"); + channel.QueueDeclare + ( + queue: _queueName, + durable: true, + exclusive: false, + autoDelete: false, + arguments: null + ); + + channel.CallbackException += (_, ea) => + { + _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel..."); + DoCreateConsumerChannel(); + }; + + _logger.LogTrace("Created RabbitMQ consumer channel."); + return channel; + } + + private void AddQueueBindForEventSubscription(string eventName) + { + var containsKey = _eventBusSubscriptionManager.HasSubscriptionsForEvent(eventName); + if (containsKey) + { + return; + } + + if (!_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } + + using var channel = _persistentConnection.CreateModel(); + channel.QueueBind(queue: _queueName, exchange: _exchangeName, routingKey: eventName); + } +} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/RabbitMQ/RabbitMQPersistentConnection.cs b/src/Infrastructure.EventBus/RabbitMQ/RabbitMQPersistentConnection.cs index ecd49f0..2022a2d 100644 --- a/src/Infrastructure.EventBus/RabbitMQ/RabbitMQPersistentConnection.cs +++ b/src/Infrastructure.EventBus/RabbitMQ/RabbitMQPersistentConnection.cs @@ -1,7 +1,6 @@ using System.Net.Sockets; using System.Text; using Polly; -using Polly.Retry; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; @@ -10,58 +9,64 @@ namespace Infrastructure.EventBus.RabbitMQ; public class RabbitMQPersistentConnection( IConnectionFactory connectionFactory, ILogger logger, - int retryCount = 5) - : IRabbitMQPersistentConnection + int timeoutBeforeReconnecting = 15) + : IPersistentConnection { - private IConnection? _connection; + private IConnection _connection; private bool _disposed; - private object sync_root = new object(); - - public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed; + private readonly object _locker = new(); + private bool _connectionFailed; + private readonly TimeSpan _timeoutBeforeReconnecting = TimeSpan.FromSeconds(timeoutBeforeReconnecting); + public event EventHandler? OnReconnectedAfterConnectionFailure; + public bool IsConnected => _connection is { IsOpen: true } && !_disposed; public bool TryConnect() { - logger.LogInformation("RabbitMQ Client is trying to connect"); + logger.LogInformation("Trying to connect to RabbitMQ..."); - lock (sync_root) + lock (_locker) { - var policy = RetryPolicy.Handle() + // Creates a policy to retry connecting to message broker until it succeeds. + var policy = Policy + .Handle() .Or() - .WaitAndRetry(retryCount, - retryAttempt => - TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + .WaitAndRetryForever((duration) => _timeoutBeforeReconnecting, (ex, time) => { logger.LogWarning(ex, - "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", - $"{time.TotalSeconds:n1}", ex.Message); - } - ); + "RabbitMQ Client could not connect after {TimeOut} seconds ({ExceptionMessage}). Waiting to try again...", + $"{(int)time.TotalSeconds}", ex.Message); + }); - policy.Execute(() => - { - _connection = connectionFactory - .CreateConnection(); - }); + policy.Execute(() => { _connection = connectionFactory.CreateConnection(); }); - if (IsConnected) + if (!IsConnected) { - _connection.ConnectionShutdown += OnConnectionShutdown; - _connection.CallbackException += OnCallbackException; - _connection.ConnectionBlocked += OnConnectionBlocked; + logger.LogCritical("ERROR: could not connect to RabbitMQ."); + _connectionFailed = true; + return false; + } + + // These event handlers hadle situations where the connection is lost by any reason. They try to reconnect the client. + _connection.ConnectionShutdown += OnConnectionShutdown; + _connection.CallbackException += OnCallbackException; + _connection.ConnectionBlocked += OnConnectionBlocked; + _connection.ConnectionUnblocked += OnConnectionUnblocked; - logger.LogInformation( - "RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", - _connection.Endpoint.HostName); + logger.LogInformation( + "RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", + _connection.Endpoint.HostName); + // If the connection has failed previously because of a RabbitMQ shutdown or something similar, we need to guarantee that the exchange and queues exist again. + // It's also necessary to rebind all application event handlers. We use this event handler below to do this. + if (!_connectionFailed) + { return true; } - else - { - logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); - return false; - } + OnReconnectedAfterConnectionFailure?.Invoke(this, null); + _connectionFailed = false; + return true; } } @@ -89,16 +94,13 @@ public class RabbitMQPersistentConnection( channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new AsyncEventingBasicConsumer(channel); - consumer.Received += new AsyncEventHandler( - async (a, b) => - { - var Headers = b.BasicProperties.Headers; - var msgBody = b.Body.ToArray(); - var message = Encoding.UTF8.GetString(msgBody); - await Task.CompletedTask; - Console.WriteLine("Received message: {0}", message); - } - ); + consumer.Received += async (a, b) => + { + var msgBody = b.Body.ToArray(); + var message = Encoding.UTF8.GetString(msgBody); + await Task.CompletedTask; + Console.WriteLine("Received message: {0}", message); + }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); @@ -107,7 +109,10 @@ public class RabbitMQPersistentConnection( public void Dispose() { - if (_disposed) return; + if (_disposed) + { + return; + } _disposed = true; @@ -123,37 +128,43 @@ public class RabbitMQPersistentConnection( private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) { - if (_disposed) - { - return; - } - + _connectionFailed = true; logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); - TryConnect(); + TryConnectIfNotDisposed(); } private void OnCallbackException(object sender, CallbackExceptionEventArgs e) { - if (_disposed) - { - return; - } - + _connectionFailed = true; logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); - TryConnect(); + TryConnectIfNotDisposed(); } private void OnConnectionShutdown(object sender, ShutdownEventArgs reason) + { + _connectionFailed = true; + logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); + + TryConnectIfNotDisposed(); + } + + private void OnConnectionUnblocked(object sender, EventArgs args) + { + _connectionFailed = true; + logger.LogWarning("A RabbitMQ connection is unblocked. Trying to re-connect..."); + TryConnectIfNotDisposed(); + } + + private void TryConnectIfNotDisposed() { if (_disposed) { + logger.LogInformation("RabbitMQ client is disposed. No action will be taken."); return; } - logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); - TryConnect(); } } \ No newline at end of file diff --git a/src/Infrastructure.EventBus/RabbitMQ/RabbitMqEventBus.cs b/src/Infrastructure.EventBus/RabbitMQ/RabbitMqEventBus.cs deleted file mode 100644 index 4b60cf9..0000000 --- a/src/Infrastructure.EventBus/RabbitMQ/RabbitMqEventBus.cs +++ /dev/null @@ -1,38 +0,0 @@ -namespace Infrastructure.EventBus.RabbitMQ; - -public class RabbitMqEventBus( - IRabbitMQPersistentConnection persistentConnection, - ILogger logger - - ) : IEventBus, IDisposable -{ - public void Dispose() - { - // TODO release managed resources here - } - - public void Publish(IntegrationEvent integrationEvent) - { - throw new NotImplementedException(); - } - - public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler - { - throw new NotImplementedException(); - } - - public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler - { - throw new NotImplementedException(); - } - - public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler - { - throw new NotImplementedException(); - } - - public void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler - { - throw new NotImplementedException(); - } -} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/Subscriptions/IEventBusSubscriptionManager.cs b/src/Infrastructure.EventBus/Subscriptions/IEventBusSubscriptionManager.cs new file mode 100644 index 0000000..a28d186 --- /dev/null +++ b/src/Infrastructure.EventBus/Subscriptions/IEventBusSubscriptionManager.cs @@ -0,0 +1,28 @@ +namespace Infrastructure.EventBus.Subscriptions; + +public interface IEventBusSubscriptionManager +{ + event EventHandler OnEventRemoved; + + bool IsEmpty { get; } + + bool HasSubscriptionsForEvent(string eventName); + + string GetEventIdentifier(); + + Type GetEventTypeByName(string eventName); + + IEnumerable GetHandlersForEvent(string eventName); + + Dictionary> GetAllSubscriptions(); + + void AddSubscription() + where TEvent : IntegrationEvent + where TEventHandler : IIntegrationEventHandler; + + void RemoveSubscription() + where TEvent : IntegrationEvent + where TEventHandler : IIntegrationEventHandler; + + void Clear(); +} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/Subscriptions/InMemoryEventBusSubscriptionManager.cs b/src/Infrastructure.EventBus/Subscriptions/InMemoryEventBusSubscriptionManager.cs new file mode 100644 index 0000000..d256f23 --- /dev/null +++ b/src/Infrastructure.EventBus/Subscriptions/InMemoryEventBusSubscriptionManager.cs @@ -0,0 +1,109 @@ +namespace Infrastructure.EventBus.Subscriptions; + +public class InMemoryEventBusSubscriptionManager : IEventBusSubscriptionManager +{ + private readonly Dictionary> _handlers = new(); + private readonly List _eventTypes = []; + public event EventHandler OnEventRemoved; + public string GetEventIdentifier() => typeof(TEvent).Name; + + public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName); + + public IEnumerable GetHandlersForEvent(string eventName) => _handlers[eventName]; + + + public Dictionary> GetAllSubscriptions() => new(_handlers); + + + public void AddSubscription() + where TEvent : IntegrationEvent + where TEventHandler : IIntegrationEventHandler + { + var eventName = GetEventIdentifier(); + + DoAddSubscription(typeof(TEvent), typeof(TEventHandler), eventName); + + if (!_eventTypes.Contains(typeof(TEvent))) + { + _eventTypes.Add(typeof(TEvent)); + } + } + + public void RemoveSubscription() + where TEventHandler : IIntegrationEventHandler + where TEvent : IntegrationEvent + { + var handlerToRemove = FindSubscriptionToRemove(); + var eventName = GetEventIdentifier(); + DoRemoveHandler(eventName, handlerToRemove); + } + + public void Clear() + { + _handlers.Clear(); + _eventTypes.Clear(); + } + + + public bool IsEmpty => !_handlers.Keys.Any(); + public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName); + + + private void DoAddSubscription(Type eventType, Type handlerType, string eventName) + { + if (!HasSubscriptionsForEvent(eventName)) + { + _handlers.Add(eventName, new List()); + } + + if (_handlers[eventName].Any(s => s.HandlerType == handlerType)) + { + throw new ArgumentException($"Handler Type {handlerType.Name} already registered for '{eventName}'", + nameof(handlerType)); + } + + _handlers[eventName].Add(new Subscription(eventType, handlerType)); + } + + private void DoRemoveHandler(string eventName, Subscription subscriptionToRemove) + { + if (subscriptionToRemove == null) + { + return; + } + + _handlers[eventName].Remove(subscriptionToRemove); + if (_handlers[eventName].Any()) + { + return; + } + + _handlers.Remove(eventName); + var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName); + if (eventType != null) + { + _eventTypes.Remove(eventType); + } + + RaiseOnEventRemoved(eventName); + } + + private void RaiseOnEventRemoved(string eventName) + { + var handler = OnEventRemoved; + handler?.Invoke(this, eventName); + } + + private Subscription FindSubscriptionToRemove() + where TEvent : IntegrationEvent + where TEventHandler : IIntegrationEventHandler + { + var eventName = GetEventIdentifier(); + return DoFindSubscriptionToRemove(eventName, typeof(TEventHandler)); + } + + private Subscription DoFindSubscriptionToRemove(string eventName, Type handlerType) + { + return !HasSubscriptionsForEvent(eventName) ? null : _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType); + } +} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/Subscriptions/Subscription.cs b/src/Infrastructure.EventBus/Subscriptions/Subscription.cs new file mode 100644 index 0000000..4d17a0e --- /dev/null +++ b/src/Infrastructure.EventBus/Subscriptions/Subscription.cs @@ -0,0 +1,13 @@ +namespace Infrastructure.EventBus.Subscriptions; + +public class Subscription +{ + public Type EventType { get; private set; } + public Type HandlerType { get; private set; } + + public Subscription(Type eventType, Type handlerType) + { + EventType = eventType; + HandlerType = handlerType; + } +} \ No newline at end of file