From 2d9bfba52f6675352cfa23585ea1f1eaad7db40e Mon Sep 17 00:00:00 2001 From: Young Date: Fri, 25 Oct 2024 17:09:15 +0800 Subject: [PATCH] optimise codes --- src/Infrastructure.EventBus/GlobalUsings.cs | 1 - src/Infrastructure.EventBus/IEventBus.cs | 10 +- .../IIntegrationEventHandler.cs | 2 +- .../IntegrationEvent.cs | 4 +- .../RabbitMQ/IPersistentConnection.cs | 4 +- .../RabbitMQ/RabbitMQEventBus.cs | 218 ++++++++++-------- .../RabbitMQ/RabbitMQPersistentConnection.cs | 15 +- .../RabbitMQ/RabbitMqConnectionOptions.cs | 26 +++ .../ServiceCollectionExtensions.cs | 45 ++++ .../IEventBusSubscriptionManager.cs | 2 +- .../InMemoryEventBusSubscriptionManager.cs | 2 +- .../Subscriptions/Subscription.cs | 2 +- 12 files changed, 212 insertions(+), 119 deletions(-) create mode 100644 src/Infrastructure.EventBus/RabbitMQ/RabbitMqConnectionOptions.cs create mode 100644 src/Infrastructure.EventBus/ServiceCollectionExtensions.cs diff --git a/src/Infrastructure.EventBus/GlobalUsings.cs b/src/Infrastructure.EventBus/GlobalUsings.cs index fdab710..c26aa69 100644 --- a/src/Infrastructure.EventBus/GlobalUsings.cs +++ b/src/Infrastructure.EventBus/GlobalUsings.cs @@ -1,5 +1,4 @@ // Global using directives global using Microsoft.Extensions.Logging; -global using Newtonsoft.Json; global using RabbitMQ.Client; \ No newline at end of file diff --git a/src/Infrastructure.EventBus/IEventBus.cs b/src/Infrastructure.EventBus/IEventBus.cs index 3b0bbce..7a01322 100644 --- a/src/Infrastructure.EventBus/IEventBus.cs +++ b/src/Infrastructure.EventBus/IEventBus.cs @@ -1,10 +1,12 @@ -namespace Infrastructure.EventBus; +namespace Roller.Infrastructure.EventBus; public interface IEventBus { - void Publish(IntegrationEvent integrationEvent); + void Publish(TEvent integrationEvent) where TEvent : IntegrationEvent; - void Subscribe() where TEvent : IntegrationEvent where TEventHandler : IIntegrationEventHandler; + void Subscribe() where TEvent : IntegrationEvent + where TEventHandler : IIntegrationEventHandler; - void Unsubscribe() where TEventHandler : IIntegrationEventHandler where TEvent : IntegrationEvent; + void Unsubscribe() where TEventHandler : IIntegrationEventHandler + where TEvent : IntegrationEvent; } \ No newline at end of file diff --git a/src/Infrastructure.EventBus/IIntegrationEventHandler.cs b/src/Infrastructure.EventBus/IIntegrationEventHandler.cs index 5bef033..de05426 100644 --- a/src/Infrastructure.EventBus/IIntegrationEventHandler.cs +++ b/src/Infrastructure.EventBus/IIntegrationEventHandler.cs @@ -1,4 +1,4 @@ -namespace Infrastructure.EventBus; +namespace Roller.Infrastructure.EventBus; public interface IIntegrationEventHandler where TIntegrationEvent : IntegrationEvent diff --git a/src/Infrastructure.EventBus/IntegrationEvent.cs b/src/Infrastructure.EventBus/IntegrationEvent.cs index 91dbe82..3fa3a49 100644 --- a/src/Infrastructure.EventBus/IntegrationEvent.cs +++ b/src/Infrastructure.EventBus/IntegrationEvent.cs @@ -1,4 +1,6 @@ -namespace Infrastructure.EventBus; +using Newtonsoft.Json; + +namespace Roller.Infrastructure.EventBus; [method: JsonConstructor] public class IntegrationEvent(Guid id, DateTime createdDate) diff --git a/src/Infrastructure.EventBus/RabbitMQ/IPersistentConnection.cs b/src/Infrastructure.EventBus/RabbitMQ/IPersistentConnection.cs index 1e02a51..9a7b17a 100644 --- a/src/Infrastructure.EventBus/RabbitMQ/IPersistentConnection.cs +++ b/src/Infrastructure.EventBus/RabbitMQ/IPersistentConnection.cs @@ -1,6 +1,6 @@ -namespace Infrastructure.EventBus.RabbitMQ; +namespace Roller.Infrastructure.EventBus.RabbitMQ; -public interface IPersistentConnection : IDisposable +public interface IPersistentConnection { event EventHandler OnReconnectedAfterConnectionFailure; bool IsConnected { get; } diff --git a/src/Infrastructure.EventBus/RabbitMQ/RabbitMQEventBus.cs b/src/Infrastructure.EventBus/RabbitMQ/RabbitMQEventBus.cs index 6ea6706..be33b85 100644 --- a/src/Infrastructure.EventBus/RabbitMQ/RabbitMQEventBus.cs +++ b/src/Infrastructure.EventBus/RabbitMQ/RabbitMQEventBus.cs @@ -1,46 +1,50 @@ using System.Net.Sockets; -using System.Reflection; using System.Text; -using Infrastructure.EventBus.Subscriptions; -using Infrastructure.Utils; +using System.Text.Json; using Polly; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; +using Roller.Infrastructure.EventBus.Subscriptions; -namespace Infrastructure.EventBus.RabbitMQ; +namespace Roller.Infrastructure.EventBus.RabbitMQ; public class RabbitMQEventBus : IEventBus { - private readonly IPersistentConnection _persistentConnection; - private readonly ILogger _logger; - private readonly IServiceProvider _serviceProvider; - private readonly IEventBusSubscriptionManager _eventBusSubscriptionManager; private readonly string _exchangeName; private readonly string _queueName; + private readonly int _publishRetryCount = 5; + private readonly TimeSpan _subscribeRetryTime = TimeSpan.FromSeconds(5); - public RabbitMQEventBus(IPersistentConnection persistentConnection, - ILogger logger, + private readonly IPersistentConnection _persistentConnection; + private readonly IEventBusSubscriptionManager _subscriptionsManager; + private readonly IServiceProvider _serviceProvider; + + private readonly ILogger _logger; + + private IModel _consumerChannel; + + public RabbitMQEventBus( + IPersistentConnection persistentConnection, + IEventBusSubscriptionManager subscriptionsManager, IServiceProvider serviceProvider, - IEventBusSubscriptionManager eventBusSubscriptionManager, - string exchangeName, + ILogger logger, + string brokerName, string queueName) { - _persistentConnection = persistentConnection; - _logger = logger; + _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); + _subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(subscriptionsManager)); _serviceProvider = serviceProvider; - _eventBusSubscriptionManager = eventBusSubscriptionManager; - _exchangeName = exchangeName; - _queueName = queueName; + _logger = logger; + _exchangeName = brokerName ?? throw new ArgumentNullException(nameof(brokerName)); + _queueName = queueName ?? throw new ArgumentNullException(nameof(queueName)); + ConfigureMessageBroker(); } - private readonly int _publishRetryCount = 5; - private IModel _consumerChannel; - private readonly TimeSpan _subscribeRetryTime = TimeSpan.FromSeconds(5); - - public void Publish(IntegrationEvent integrationEvent) + public void Publish(TEvent @event) + where TEvent : IntegrationEvent { - if (_persistentConnection.IsConnected) + if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } @@ -52,93 +56,110 @@ public class RabbitMQEventBus : IEventBus (exception, timeSpan) => { _logger.LogWarning(exception, - "Could not publish event #{EventId} after {Timeout} seconds: {ExceptionMessage}.", - integrationEvent.Id, + "Could not publish event #{EventId} after {Timeout} seconds: {ExceptionMessage}.", @event.Id, $"{timeSpan.TotalSeconds:n1}", exception.Message); }); - var eventName = integrationEvent.GetType().Name; - _logger.LogTrace("Creating RabbitMQ channel to publish event #{EventId} ({EventName})...", integrationEvent.Id, + + var eventName = @event.GetType().Name; + + _logger.LogTrace("Creating RabbitMQ channel to publish event #{EventId} ({EventName})...", @event.Id, eventName); + using var channel = _persistentConnection.CreateModel(); - _logger.LogTrace("Declaring RabbitMQ exchange to publish event #{EventId}...", integrationEvent.Id); + _logger.LogTrace("Declaring RabbitMQ exchange to publish event #{EventId}...", @event.Id); + channel.ExchangeDeclare(exchange: _exchangeName, type: "direct"); - var message = integrationEvent.Serialize(); + var message = JsonSerializer.Serialize(@event); var body = Encoding.UTF8.GetBytes(message); + policy.Execute(() => { var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; - _logger.LogTrace("Publishing event to RabbitMQ with ID #{EventId}...", integrationEvent.Id); + + _logger.LogTrace("Publishing event to RabbitMQ with ID #{EventId}...", @event.Id); + channel.BasicPublish( exchange: _exchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body); - _logger.LogTrace("Published event with ID #{EventId}.", integrationEvent.Id); + + _logger.LogTrace("Published event with ID #{EventId}.", @event.Id); }); } - public void Subscribe() where TEvent : IntegrationEvent + public void Subscribe() + where TEvent : IntegrationEvent where TEventHandler : IIntegrationEventHandler { - var eventName = _eventBusSubscriptionManager.GetEventIdentifier(); + var eventName = _subscriptionsManager.GetEventIdentifier(); var eventHandlerName = typeof(TEventHandler).Name; + AddQueueBindForEventSubscription(eventName); + _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}...", eventName, eventHandlerName); - _eventBusSubscriptionManager.AddSubscription(); + + _subscriptionsManager.AddSubscription(); + StartBasicConsume(); + _logger.LogInformation("Subscribed to event {EventName} with {EvenHandler}.", eventName, eventHandlerName); } - public void Unsubscribe() where TEvent : IntegrationEvent + public void Unsubscribe() + where TEvent : IntegrationEvent where TEventHandler : IIntegrationEventHandler { - var eventName = _eventBusSubscriptionManager.GetEventIdentifier(); + var eventName = _subscriptionsManager.GetEventIdentifier(); + _logger.LogInformation("Unsubscribing from event {EventName}...", eventName); - _eventBusSubscriptionManager.RemoveSubscription(); + _subscriptionsManager.RemoveSubscription(); _logger.LogInformation("Unsubscribed from event {EventName}.", eventName); } - private void ConfigureMessageBroker() { _consumerChannel = CreateConsumerChannel(); - _eventBusSubscriptionManager.OnEventRemoved += SubscriptionManager_OnEventRemoved; + _subscriptionsManager.OnEventRemoved += SubscriptionManager_OnEventRemoved; _persistentConnection.OnReconnectedAfterConnectionFailure += PersistentConnection_OnReconnectedAfterConnectionFailure; } - private void PersistentConnection_OnReconnectedAfterConnectionFailure(object sender, EventArgs e) + private IModel CreateConsumerChannel() { - DoCreateConsumerChannel(); - RecreateSubscriptions(); - } + if (!_persistentConnection.IsConnected) + { + _persistentConnection.TryConnect(); + } - private void RecreateSubscriptions() - { - var subscriptions = _eventBusSubscriptionManager.GetAllSubscriptions(); - _eventBusSubscriptionManager.Clear(); + _logger.LogTrace("Creating RabbitMQ consumer channel..."); - var eventBusType = this.GetType(); + var channel = _persistentConnection.CreateModel(); - foreach (var entry in subscriptions) + channel.ExchangeDeclare(exchange: _exchangeName, type: "direct"); + channel.QueueDeclare + ( + queue: _queueName, + durable: true, + exclusive: false, + autoDelete: false, + arguments: null + ); + + channel.CallbackException += (sender, ea) => { - foreach (var genericSubscribe in entry.Value.Select(subscription => eventBusType.GetMethod("Subscribe") - .MakeGenericMethod(subscription.EventType, subscription.HandlerType))) - { - genericSubscribe.Invoke(this, null); - } - } - } + _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel..."); + DoCreateConsumerChannel(); + }; - private void DoCreateConsumerChannel() - { - _consumerChannel.Dispose(); - _consumerChannel = CreateConsumerChannel(); - StartBasicConsume(); + _logger.LogTrace("Created RabbitMQ consumer channel."); + + + return channel; } private void StartBasicConsume() @@ -191,7 +212,6 @@ public class RabbitMQEventBus : IEventBus } } - private async Task TryEnqueueMessageAgainAsync(BasicDeliverEventArgs eventArgs) { try @@ -214,13 +234,13 @@ public class RabbitMQEventBus : IEventBus { _logger.LogTrace("Processing RabbitMQ event: {EventName}...", eventName); - if (!_eventBusSubscriptionManager.HasSubscriptionsForEvent(eventName)) + if (!_subscriptionsManager.HasSubscriptionsForEvent(eventName)) { _logger.LogTrace("There are no subscriptions for this event."); return; } - var subscriptions = _eventBusSubscriptionManager.GetHandlersForEvent(eventName); + var subscriptions = _subscriptionsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { var handler = _serviceProvider.GetService(subscription.HandlerType); @@ -230,9 +250,9 @@ public class RabbitMQEventBus : IEventBus continue; } - var eventType = _eventBusSubscriptionManager.GetEventTypeByName(eventName); + var eventType = _subscriptionsManager.GetEventTypeByName(eventName); - var @event = JsonConvert.DeserializeObject(message, eventType); + var @event = JsonSerializer.Deserialize(message, eventType); var eventHandlerType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await Task.Yield(); await (Task)eventHandlerType.GetMethod(nameof(IIntegrationEventHandler.HandleAsync)) @@ -252,46 +272,15 @@ public class RabbitMQEventBus : IEventBus using var channel = _persistentConnection.CreateModel(); channel.QueueUnbind(queue: _queueName, exchange: _exchangeName, routingKey: eventName); - if (_eventBusSubscriptionManager.IsEmpty) + if (_subscriptionsManager.IsEmpty) { _consumerChannel.Close(); } } - private IModel CreateConsumerChannel() - { - if (!_persistentConnection.IsConnected) - { - _persistentConnection.TryConnect(); - } - - _logger.LogTrace("Creating RabbitMQ consumer channel..."); - - var channel = _persistentConnection.CreateModel(); - - channel.ExchangeDeclare(exchange: _exchangeName, type: "direct"); - channel.QueueDeclare - ( - queue: _queueName, - durable: true, - exclusive: false, - autoDelete: false, - arguments: null - ); - - channel.CallbackException += (_, ea) => - { - _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel..."); - DoCreateConsumerChannel(); - }; - - _logger.LogTrace("Created RabbitMQ consumer channel."); - return channel; - } - private void AddQueueBindForEventSubscription(string eventName) { - var containsKey = _eventBusSubscriptionManager.HasSubscriptionsForEvent(eventName); + var containsKey = _subscriptionsManager.HasSubscriptionsForEvent(eventName); if (containsKey) { return; @@ -305,4 +294,35 @@ public class RabbitMQEventBus : IEventBus using var channel = _persistentConnection.CreateModel(); channel.QueueBind(queue: _queueName, exchange: _exchangeName, routingKey: eventName); } + + private void PersistentConnection_OnReconnectedAfterConnectionFailure(object sender, EventArgs e) + { + DoCreateConsumerChannel(); + RecreateSubscriptions(); + } + + private void DoCreateConsumerChannel() + { + _consumerChannel.Dispose(); + _consumerChannel = CreateConsumerChannel(); + StartBasicConsume(); + } + + private void RecreateSubscriptions() + { + var subscriptions = _subscriptionsManager.GetAllSubscriptions(); + _subscriptionsManager.Clear(); + + var eventBusType = GetType(); + + foreach (var entry in subscriptions) + { + foreach (var subscription in entry.Value) + { + var genericSubscribe = eventBusType.GetMethod("Subscribe") + .MakeGenericMethod(subscription.EventType, subscription.HandlerType); + genericSubscribe.Invoke(this, null); + } + } + } } \ No newline at end of file diff --git a/src/Infrastructure.EventBus/RabbitMQ/RabbitMQPersistentConnection.cs b/src/Infrastructure.EventBus/RabbitMQ/RabbitMQPersistentConnection.cs index 2022a2d..10fe3b1 100644 --- a/src/Infrastructure.EventBus/RabbitMQ/RabbitMQPersistentConnection.cs +++ b/src/Infrastructure.EventBus/RabbitMQ/RabbitMQPersistentConnection.cs @@ -4,7 +4,7 @@ using Polly; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; -namespace Infrastructure.EventBus.RabbitMQ; +namespace Roller.Infrastructure.EventBus.RabbitMQ; public class RabbitMQPersistentConnection( IConnectionFactory connectionFactory, @@ -26,7 +26,7 @@ public class RabbitMQPersistentConnection( lock (_locker) { - // Creates a policy to retry connecting to message broker until it succeeds. + // Creates a policy to retry connecting to message broker until it succeds. var policy = Policy .Handle() .Or() @@ -47,7 +47,7 @@ public class RabbitMQPersistentConnection( return false; } - // These event handlers hadle situations where the connection is lost by any reason. They try to reconnect the client. + // These event handlers hanle situations where the connection is lost by any reason. They try to reconnect the client. _connection.ConnectionShutdown += OnConnectionShutdown; _connection.CallbackException += OnCallbackException; _connection.ConnectionBlocked += OnConnectionBlocked; @@ -59,13 +59,12 @@ public class RabbitMQPersistentConnection( // If the connection has failed previously because of a RabbitMQ shutdown or something similar, we need to guarantee that the exchange and queues exist again. // It's also necessary to rebind all application event handlers. We use this event handler below to do this. - if (!_connectionFailed) + if (_connectionFailed) { - return true; + OnReconnectedAfterConnectionFailure?.Invoke(this, null); + _connectionFailed = false; } - OnReconnectedAfterConnectionFailure?.Invoke(this, null); - _connectionFailed = false; return true; } } @@ -74,7 +73,7 @@ public class RabbitMQPersistentConnection( { if (!IsConnected) { - throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); + throw new InvalidOperationException("No RabbitMQ connections are available to perform this action."); } return _connection.CreateModel(); diff --git a/src/Infrastructure.EventBus/RabbitMQ/RabbitMqConnectionOptions.cs b/src/Infrastructure.EventBus/RabbitMQ/RabbitMqConnectionOptions.cs new file mode 100644 index 0000000..a25be7a --- /dev/null +++ b/src/Infrastructure.EventBus/RabbitMQ/RabbitMqConnectionOptions.cs @@ -0,0 +1,26 @@ +namespace Roller.Infrastructure.EventBus.RabbitMQ; + +public class RabbitMqConnectionOptions +{ + public const string SectionName = "RabbitMQ"; + + public const string USER = "RABBITMQ_USER"; + + public const string PASSWORD = "RABBITMQ_PASSWORD"; + + public const string HOST = "RABBITMQ_HOST"; + + public required string Username { get; set; } + + public required string Password { get; set; } + + public required string HostName { get; set; } + + public required string ExchangeName { get; set; } + + public required string QueueName { get; set; } + + public int TimeoutBeforeReconnecting { get; set; } + + public bool DispatchConsumersAsync { get; set; } = true; +} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/ServiceCollectionExtensions.cs b/src/Infrastructure.EventBus/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..e07ccda --- /dev/null +++ b/src/Infrastructure.EventBus/ServiceCollectionExtensions.cs @@ -0,0 +1,45 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Roller.Infrastructure.EventBus.RabbitMQ; +using Roller.Infrastructure.EventBus.Subscriptions; + +namespace Roller.Infrastructure.EventBus; + +public static class ServiceCollectionExtensions +{ + public static IServiceCollection AddRollerRabbitMQEventBus(this IServiceCollection services, + IConfiguration configuration) + { + ArgumentNullException.ThrowIfNull(configuration); + ArgumentNullException.ThrowIfNull(services); + + var rabbitMqOptions = configuration.GetSection(RabbitMqConnectionOptions.SectionName) + .Get(); + ArgumentNullException.ThrowIfNull(rabbitMqOptions); + services.AddSingleton(); + services.AddSingleton(factory => + { + var connectionFactory = new ConnectionFactory + { + HostName = configuration[RabbitMqConnectionOptions.HOST] ?? rabbitMqOptions.HostName, + UserName = configuration[RabbitMqConnectionOptions.USER] ?? rabbitMqOptions.Username, + Password = configuration[RabbitMqConnectionOptions.PASSWORD] ?? rabbitMqOptions.Password, + DispatchConsumersAsync = rabbitMqOptions.DispatchConsumersAsync, + }; + + var logger = factory.GetService>(); + return new RabbitMQPersistentConnection(connectionFactory, logger, + rabbitMqOptions.TimeoutBeforeReconnecting); + }); + services.AddSingleton(factory => + { + var persistentConnection = factory.GetService(); + var subscriptionManager = factory.GetService(); + var logger = factory.GetService>(); + + return new RabbitMQEventBus(persistentConnection, subscriptionManager, factory, logger, + rabbitMqOptions.ExchangeName, rabbitMqOptions.QueueName); + }); + return services; + } +} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/Subscriptions/IEventBusSubscriptionManager.cs b/src/Infrastructure.EventBus/Subscriptions/IEventBusSubscriptionManager.cs index a28d186..e11e67f 100644 --- a/src/Infrastructure.EventBus/Subscriptions/IEventBusSubscriptionManager.cs +++ b/src/Infrastructure.EventBus/Subscriptions/IEventBusSubscriptionManager.cs @@ -1,4 +1,4 @@ -namespace Infrastructure.EventBus.Subscriptions; +namespace Roller.Infrastructure.EventBus.Subscriptions; public interface IEventBusSubscriptionManager { diff --git a/src/Infrastructure.EventBus/Subscriptions/InMemoryEventBusSubscriptionManager.cs b/src/Infrastructure.EventBus/Subscriptions/InMemoryEventBusSubscriptionManager.cs index d256f23..90d2a04 100644 --- a/src/Infrastructure.EventBus/Subscriptions/InMemoryEventBusSubscriptionManager.cs +++ b/src/Infrastructure.EventBus/Subscriptions/InMemoryEventBusSubscriptionManager.cs @@ -1,4 +1,4 @@ -namespace Infrastructure.EventBus.Subscriptions; +namespace Roller.Infrastructure.EventBus.Subscriptions; public class InMemoryEventBusSubscriptionManager : IEventBusSubscriptionManager { diff --git a/src/Infrastructure.EventBus/Subscriptions/Subscription.cs b/src/Infrastructure.EventBus/Subscriptions/Subscription.cs index 4d17a0e..a46da21 100644 --- a/src/Infrastructure.EventBus/Subscriptions/Subscription.cs +++ b/src/Infrastructure.EventBus/Subscriptions/Subscription.cs @@ -1,4 +1,4 @@ -namespace Infrastructure.EventBus.Subscriptions; +namespace Roller.Infrastructure.EventBus.Subscriptions; public class Subscription {