@ -1,46 +1,50 @@
using System.Net.Sockets ;
using System.Net.Sockets ;
using System.Reflection ;
using System.Text ;
using System.Text ;
using Infrastructure.EventBus.Subscriptions ;
using System.Text.Json ;
using Infrastructure.Utils ;
using Polly ;
using Polly ;
using RabbitMQ.Client.Events ;
using RabbitMQ.Client.Events ;
using RabbitMQ.Client.Exceptions ;
using RabbitMQ.Client.Exceptions ;
using Roller.Infrastructure.EventBus.Subscriptions ;
namespace Infrastructure.EventBus.RabbitMQ;
namespace Roller. Infrastructure.EventBus.RabbitMQ;
public class RabbitMQEventBus : IEventBus
public class RabbitMQEventBus : IEventBus
{
{
private readonly IPersistentConnection _persistentConnection ;
private readonly ILogger < RabbitMQEventBus > _logger ;
private readonly IServiceProvider _serviceProvider ;
private readonly IEventBusSubscriptionManager _eventBusSubscriptionManager ;
private readonly string _exchangeName ;
private readonly string _exchangeName ;
private readonly string _queueName ;
private readonly string _queueName ;
private readonly int _publishRetryCount = 5 ;
private readonly TimeSpan _subscribeRetryTime = TimeSpan . FromSeconds ( 5 ) ;
public RabbitMQEventBus ( IPersistentConnection persistentConnection ,
private readonly IPersistentConnection _persistentConnection ;
ILogger < RabbitMQEventBus > logger ,
private readonly IEventBusSubscriptionManager _subscriptionsManager ;
private readonly IServiceProvider _serviceProvider ;
private readonly ILogger < RabbitMQEventBus > _logger ;
private IModel _consumerChannel ;
public RabbitMQEventBus (
IPersistentConnection persistentConnection ,
IEventBusSubscriptionManager subscriptionsManager ,
IServiceProvider serviceProvider ,
IServiceProvider serviceProvider ,
IEventBusSubscriptionManager eventBusSubscriptionManager ,
I Logger< RabbitMQEventBus > log ger,
string exchangeName ,
string broker Name,
string queueName )
string queueName )
{
{
_persistentConnection = persistentConnection ;
_persistentConnection = persistentConnection ? ? throw new ArgumentNullException ( nameof ( persistentConnection ) ) ;
_logger = logger ;
_ subscriptionsManager = subscriptionsManager ? ? throw new ArgumentNullException ( nameof ( subscriptionsManager ) ) ;
_serviceProvider = serviceProvider ;
_serviceProvider = serviceProvider ;
_eventBusSubscriptionManager = eventBusSubscriptionManager ;
_logger = logger ;
_exchangeName = exchangeName ;
_exchangeName = brokerName ? ? throw new ArgumentNullException ( nameof ( brokerName ) ) ;
_queueName = queueName ;
_queueName = queueName ? ? throw new ArgumentNullException ( nameof ( queueName ) ) ;
ConfigureMessageBroker ( ) ;
ConfigureMessageBroker ( ) ;
}
}
private readonly int _publishRetryCount = 5 ;
public void Publish < TEvent > ( TEvent @event )
private IModel _consumerChannel ;
where TEvent : IntegrationEvent
private readonly TimeSpan _subscribeRetryTime = TimeSpan . FromSeconds ( 5 ) ;
public void Publish ( IntegrationEvent integrationEvent )
{
{
if ( _persistentConnection . IsConnected )
if ( ! _persistentConnection . IsConnected )
{
{
_persistentConnection . TryConnect ( ) ;
_persistentConnection . TryConnect ( ) ;
}
}
@ -52,93 +56,110 @@ public class RabbitMQEventBus : IEventBus
( exception , timeSpan ) = >
( exception , timeSpan ) = >
{
{
_logger . LogWarning ( exception ,
_logger . LogWarning ( exception ,
"Could not publish event #{EventId} after {Timeout} seconds: {ExceptionMessage}." ,
"Could not publish event #{EventId} after {Timeout} seconds: {ExceptionMessage}." , @event . Id ,
integrationEvent . Id ,
$"{timeSpan.TotalSeconds:n1}" , exception . Message ) ;
$"{timeSpan.TotalSeconds:n1}" , exception . Message ) ;
} ) ;
} ) ;
var eventName = integrationEvent . GetType ( ) . Name ;
_logger . LogTrace ( "Creating RabbitMQ channel to publish event #{EventId} ({EventName})..." , integrationEvent . Id ,
var eventName = @event . GetType ( ) . Name ;
_logger . LogTrace ( "Creating RabbitMQ channel to publish event #{EventId} ({EventName})..." , @event . Id ,
eventName ) ;
eventName ) ;
using var channel = _persistentConnection . CreateModel ( ) ;
using var channel = _persistentConnection . CreateModel ( ) ;
_logger . LogTrace ( "Declaring RabbitMQ exchange to publish event #{EventId}..." , integrationEvent . Id ) ;
_logger . LogTrace ( "Declaring RabbitMQ exchange to publish event #{EventId}..." , @event . Id ) ;
channel . ExchangeDeclare ( exchange : _exchangeName , type : "direct" ) ;
channel . ExchangeDeclare ( exchange : _exchangeName , type : "direct" ) ;
var message = integrationEvent. Serialize ( ) ;
var message = JsonSerializer. Serialize ( @event ) ;
var body = Encoding . UTF8 . GetBytes ( message ) ;
var body = Encoding . UTF8 . GetBytes ( message ) ;
policy . Execute ( ( ) = >
policy . Execute ( ( ) = >
{
{
var properties = channel . CreateBasicProperties ( ) ;
var properties = channel . CreateBasicProperties ( ) ;
properties . DeliveryMode = 2 ;
properties . DeliveryMode = 2 ;
_logger . LogTrace ( "Publishing event to RabbitMQ with ID #{EventId}..." , integrationEvent . Id ) ;
_logger . LogTrace ( "Publishing event to RabbitMQ with ID #{EventId}..." , @event . Id ) ;
channel . BasicPublish (
channel . BasicPublish (
exchange : _exchangeName ,
exchange : _exchangeName ,
routingKey : eventName ,
routingKey : eventName ,
mandatory : true ,
mandatory : true ,
basicProperties : properties ,
basicProperties : properties ,
body : body ) ;
body : body ) ;
_logger . LogTrace ( "Published event with ID #{EventId}." , integrationEvent . Id ) ;
_logger . LogTrace ( "Published event with ID #{EventId}." , @event . Id ) ;
} ) ;
} ) ;
}
}
public void Subscribe < TEvent , TEventHandler > ( ) where TEvent : IntegrationEvent
public void Subscribe < TEvent , TEventHandler > ( )
where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler < TEvent >
where TEventHandler : IIntegrationEventHandler < TEvent >
{
{
var eventName = _ eventBu sS ubscriptionManager. GetEventIdentifier < TEvent > ( ) ;
var eventName = _ subscriptions Manager. GetEventIdentifier < TEvent > ( ) ;
var eventHandlerName = typeof ( TEventHandler ) . Name ;
var eventHandlerName = typeof ( TEventHandler ) . Name ;
AddQueueBindForEventSubscription ( eventName ) ;
AddQueueBindForEventSubscription ( eventName ) ;
_logger . LogInformation ( "Subscribing to event {EventName} with {EventHandler}..." , eventName , eventHandlerName ) ;
_logger . LogInformation ( "Subscribing to event {EventName} with {EventHandler}..." , eventName , eventHandlerName ) ;
_eventBusSubscriptionManager . AddSubscription < TEvent , TEventHandler > ( ) ;
_subscriptionsManager . AddSubscription < TEvent , TEventHandler > ( ) ;
StartBasicConsume ( ) ;
_logger . LogInformation ( "Subscribed to event {EventName} with {EvenHandler}." , eventName , eventHandlerName ) ;
_logger . LogInformation ( "Subscribed to event {EventName} with {EvenHandler}." , eventName , eventHandlerName ) ;
}
}
public void Unsubscribe < TEvent , TEventHandler > ( ) where TEvent : IntegrationEvent
public void Unsubscribe < TEvent , TEventHandler > ( )
where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler < TEvent >
where TEventHandler : IIntegrationEventHandler < TEvent >
{
{
var eventName = _eventBusSubscriptionManager . GetEventIdentifier < TEvent > ( ) ;
var eventName = _subscriptionsManager . GetEventIdentifier < TEvent > ( ) ;
_logger . LogInformation ( "Unsubscribing from event {EventName}..." , eventName ) ;
_logger . LogInformation ( "Unsubscribing from event {EventName}..." , eventName ) ;
_ eventBu sS ubscriptionManager. RemoveSubscription < TEvent , TEventHandler > ( ) ;
_ subscriptions Manager. RemoveSubscription < TEvent , TEventHandler > ( ) ;
_logger . LogInformation ( "Unsubscribed from event {EventName}." , eventName ) ;
_logger . LogInformation ( "Unsubscribed from event {EventName}." , eventName ) ;
}
}
private void ConfigureMessageBroker ( )
private void ConfigureMessageBroker ( )
{
{
_consumerChannel = CreateConsumerChannel ( ) ;
_consumerChannel = CreateConsumerChannel ( ) ;
_ eventBu sS ubscriptionManager. OnEventRemoved + = SubscriptionManager_OnEventRemoved ;
_ subscriptions Manager. OnEventRemoved + = SubscriptionManager_OnEventRemoved ;
_persistentConnection . OnReconnectedAfterConnectionFailure + =
_persistentConnection . OnReconnectedAfterConnectionFailure + =
PersistentConnection_OnReconnectedAfterConnectionFailure ;
PersistentConnection_OnReconnectedAfterConnectionFailure ;
}
}
private void PersistentConnection_OnReconnectedAfterConnectionFailure ( object sender , EventArgs e )
private IModel CreateConsumerChannel ( )
{
{
DoCreateConsumerChannel ( ) ;
if ( ! _persistentConnection . IsConnected )
RecreateSubscriptions ( ) ;
{
_persistentConnection . TryConnect ( ) ;
}
}
private void RecreateSubscriptions ( )
_logger . LogTrace ( "Creating RabbitMQ consumer channel..." ) ;
{
var subscriptions = _eventBusSubscriptionManager . GetAllSubscriptions ( ) ;
_eventBusSubscriptionManager . Clear ( ) ;
var eventBusType = this . GetType ( ) ;
var channel = _persistentConnection . CreateModel ( ) ;
foreach ( var entry in subscriptions )
channel . ExchangeDeclare ( exchange : _exchangeName , type : "direct" ) ;
{
channel . QueueDeclare
foreach ( var genericSubscribe in entry . Value . Select ( subscription = > eventBusType . GetMethod ( "Subscribe" )
(
. MakeGenericMethod ( subscription . EventType , subscription . HandlerType ) ) )
queue : _queueName ,
{
durable : true ,
genericSubscribe . Invoke ( this , null ) ;
exclusive : false ,
}
autoDelete : false ,
}
arguments : null
}
) ;
private void DoCreateConsumerChannel ( )
channel . CallbackException + = ( sender , ea ) = >
{
{
_consumerChannel . Dispose ( ) ;
_logger . LogWarning ( ea . Exception , "Recreating RabbitMQ consumer channel..." ) ;
_consumerChannel = CreateConsumerChannel ( ) ;
DoCreateConsumerChannel ( ) ;
StartBasicConsume ( ) ;
} ;
_logger . LogTrace ( "Created RabbitMQ consumer channel." ) ;
return channel ;
}
}
private void StartBasicConsume ( )
private void StartBasicConsume ( )
@ -191,7 +212,6 @@ public class RabbitMQEventBus : IEventBus
}
}
}
}
private async Task TryEnqueueMessageAgainAsync ( BasicDeliverEventArgs eventArgs )
private async Task TryEnqueueMessageAgainAsync ( BasicDeliverEventArgs eventArgs )
{
{
try
try
@ -214,13 +234,13 @@ public class RabbitMQEventBus : IEventBus
{
{
_logger . LogTrace ( "Processing RabbitMQ event: {EventName}..." , eventName ) ;
_logger . LogTrace ( "Processing RabbitMQ event: {EventName}..." , eventName ) ;
if ( ! _ eventBu sS ubscriptionManager. HasSubscriptionsForEvent ( eventName ) )
if ( ! _ subscriptions Manager. HasSubscriptionsForEvent ( eventName ) )
{
{
_logger . LogTrace ( "There are no subscriptions for this event." ) ;
_logger . LogTrace ( "There are no subscriptions for this event." ) ;
return ;
return ;
}
}
var subscriptions = _ eventBu sS ubscriptionManager. GetHandlersForEvent ( eventName ) ;
var subscriptions = _ subscriptions Manager. GetHandlersForEvent ( eventName ) ;
foreach ( var subscription in subscriptions )
foreach ( var subscription in subscriptions )
{
{
var handler = _serviceProvider . GetService ( subscription . HandlerType ) ;
var handler = _serviceProvider . GetService ( subscription . HandlerType ) ;
@ -230,9 +250,9 @@ public class RabbitMQEventBus : IEventBus
continue ;
continue ;
}
}
var eventType = _ eventBu sS ubscriptionManager. GetEventTypeByName ( eventName ) ;
var eventType = _ subscriptions Manager. GetEventTypeByName ( eventName ) ;
var @event = Json Convert. DeserializeObject ( message , eventType ) ;
var @event = Json Serializer. Deserialize ( message , eventType ) ;
var eventHandlerType = typeof ( IIntegrationEventHandler < > ) . MakeGenericType ( eventType ) ;
var eventHandlerType = typeof ( IIntegrationEventHandler < > ) . MakeGenericType ( eventType ) ;
await Task . Yield ( ) ;
await Task . Yield ( ) ;
await ( Task ) eventHandlerType . GetMethod ( nameof ( IIntegrationEventHandler < IntegrationEvent > . HandleAsync ) )
await ( Task ) eventHandlerType . GetMethod ( nameof ( IIntegrationEventHandler < IntegrationEvent > . HandleAsync ) )
@ -252,57 +272,57 @@ public class RabbitMQEventBus : IEventBus
using var channel = _persistentConnection . CreateModel ( ) ;
using var channel = _persistentConnection . CreateModel ( ) ;
channel . QueueUnbind ( queue : _queueName , exchange : _exchangeName , routingKey : eventName ) ;
channel . QueueUnbind ( queue : _queueName , exchange : _exchangeName , routingKey : eventName ) ;
if ( _ eventBu sS ubscriptionManager. IsEmpty )
if ( _ subscriptions Manager. IsEmpty )
{
{
_consumerChannel . Close ( ) ;
_consumerChannel . Close ( ) ;
}
}
}
}
private IModel CreateConsumerChannel ( )
private void AddQueueBindForEventSubscription ( string eventName )
{
var containsKey = _subscriptionsManager . HasSubscriptionsForEvent ( eventName ) ;
if ( containsKey )
{
{
return ;
}
if ( ! _persistentConnection . IsConnected )
if ( ! _persistentConnection . IsConnected )
{
{
_persistentConnection . TryConnect ( ) ;
_persistentConnection . TryConnect ( ) ;
}
}
_logger . LogTrace ( "Creating RabbitMQ consumer channel..." ) ;
using var channel = _persistentConnection . CreateModel ( ) ;
channel . QueueBind ( queue : _queueName , exchange : _exchangeName , routingKey : eventName ) ;
var channel = _persistentConnection . CreateModel ( ) ;
}
channel . ExchangeDeclare ( exchange : _exchangeName , type : "direct" ) ;
channel . QueueDeclare
(
queue : _queueName ,
durable : true ,
exclusive : false ,
autoDelete : false ,
arguments : null
) ;
channel . CallbackException + = ( _ , ea ) = >
private void PersistentConnection_OnReconnectedAfterConnectionFailure ( object sender , EventArgs e )
{
{
_logger . LogWarning ( ea . Exception , "Recreating RabbitMQ consumer channel..." ) ;
DoCreateConsumerChannel ( ) ;
DoCreateConsumerChannel ( ) ;
} ;
RecreateSubscriptions ( ) ;
_logger . LogTrace ( "Created RabbitMQ consumer channel." ) ;
return channel ;
}
}
private void AddQueueBindForEventSubscription ( string eventName )
private void DoCreateConsumerChannel ( )
{
var containsKey = _eventBusSubscriptionManager . HasSubscriptionsForEvent ( eventName ) ;
if ( containsKey )
{
{
return ;
_consumerChannel . Dispose ( ) ;
_consumerChannel = CreateConsumerChannel ( ) ;
StartBasicConsume ( ) ;
}
}
if ( ! _persistentConnection . IsConnected )
private void RecreateSubscriptions ( )
{
{
_persistentConnection . TryConnect ( ) ;
var subscriptions = _subscriptionsManager . GetAllSubscriptions ( ) ;
}
_subscriptionsManager . Clear ( ) ;
using var channel = _persistentConnection . CreateModel ( ) ;
var eventBusType = GetType ( ) ;
channel . QueueBind ( queue : _queueName , exchange : _exchangeName , routingKey : eventName ) ;
foreach ( var entry in subscriptions )
{
foreach ( var subscription in entry . Value )
{
var genericSubscribe = eventBusType . GetMethod ( "Subscribe" )
. MakeGenericMethod ( subscription . EventType , subscription . HandlerType ) ;
genericSubscribe . Invoke ( this , null ) ;
}
}
}
}
}
}