updated event bus

master
Young 7 months ago
parent e3d3724ef6
commit cbd0eaa353

@ -1,6 +0,0 @@
namespace Infrastructure.EventBus;
public interface IDynamicIntegrationEventHandler
{
Task Handle(dynamic eventData);
}

@ -4,11 +4,7 @@ public interface IEventBus
{
void Publish(IntegrationEvent integrationEvent);
void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>;
void Subscribe<TEvent, TEventHandler>() where TEvent : IntegrationEvent where TEventHandler : IIntegrationEventHandler<TEvent>;
void Unsubscribe<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent;
void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler;
void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler;
void Unsubscribe<TEvent, TEventHandler>() where TEventHandler : IIntegrationEventHandler<TEvent> where TEvent : IntegrationEvent;
}

@ -1,11 +1,7 @@
namespace Infrastructure.EventBus;
public interface IIntegrationEventHandler
{
}
public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler
public interface IIntegrationEventHandler<in TIntegrationEvent>
where TIntegrationEvent : IntegrationEvent
{
Task Handle(TIntegrationEvent integrationEvent);
}
Task HandleAsync(TIntegrationEvent integrationEvent);
}

@ -16,4 +16,8 @@
<PackageReference Include="Polly" Version="8.4.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Infrastructure\Infrastructure.csproj" />
</ItemGroup>
</Project>

@ -1,7 +1,8 @@
namespace Infrastructure.EventBus.RabbitMQ;
public interface IRabbitMQPersistentConnection : IDisposable
public interface IPersistentConnection : IDisposable
{
event EventHandler OnReconnectedAfterConnectionFailure;
bool IsConnected { get; }
bool TryConnect();

@ -0,0 +1,308 @@
using System.Net.Sockets;
using System.Reflection;
using System.Text;
using Infrastructure.EventBus.Subscriptions;
using Infrastructure.Utils;
using Polly;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
namespace Infrastructure.EventBus.RabbitMQ;
public class RabbitMQEventBus : IEventBus
{
private readonly IPersistentConnection _persistentConnection;
private readonly ILogger<RabbitMQEventBus> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IEventBusSubscriptionManager _eventBusSubscriptionManager;
private readonly string _exchangeName;
private readonly string _queueName;
public RabbitMQEventBus(IPersistentConnection persistentConnection,
ILogger<RabbitMQEventBus> logger,
IServiceProvider serviceProvider,
IEventBusSubscriptionManager eventBusSubscriptionManager,
string exchangeName,
string queueName)
{
_persistentConnection = persistentConnection;
_logger = logger;
_serviceProvider = serviceProvider;
_eventBusSubscriptionManager = eventBusSubscriptionManager;
_exchangeName = exchangeName;
_queueName = queueName;
ConfigureMessageBroker();
}
private readonly int _publishRetryCount = 5;
private IModel _consumerChannel;
private readonly TimeSpan _subscribeRetryTime = TimeSpan.FromSeconds(5);
public void Publish(IntegrationEvent integrationEvent)
{
if (_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
var policy = Policy
.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.WaitAndRetry(_publishRetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, timeSpan) =>
{
_logger.LogWarning(exception,
"Could not publish event #{EventId} after {Timeout} seconds: {ExceptionMessage}.",
integrationEvent.Id,
$"{timeSpan.TotalSeconds:n1}", exception.Message);
});
var eventName = integrationEvent.GetType().Name;
_logger.LogTrace("Creating RabbitMQ channel to publish event #{EventId} ({EventName})...", integrationEvent.Id,
eventName);
using var channel = _persistentConnection.CreateModel();
_logger.LogTrace("Declaring RabbitMQ exchange to publish event #{EventId}...", integrationEvent.Id);
channel.ExchangeDeclare(exchange: _exchangeName, type: "direct");
var message = integrationEvent.Serialize();
var body = Encoding.UTF8.GetBytes(message);
policy.Execute(() =>
{
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
_logger.LogTrace("Publishing event to RabbitMQ with ID #{EventId}...", integrationEvent.Id);
channel.BasicPublish(
exchange: _exchangeName,
routingKey: eventName,
mandatory: true,
basicProperties: properties,
body: body);
_logger.LogTrace("Published event with ID #{EventId}.", integrationEvent.Id);
});
}
public void Subscribe<TEvent, TEventHandler>() where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler<TEvent>
{
var eventName = _eventBusSubscriptionManager.GetEventIdentifier<TEvent>();
var eventHandlerName = typeof(TEventHandler).Name;
AddQueueBindForEventSubscription(eventName);
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}...", eventName, eventHandlerName);
_eventBusSubscriptionManager.AddSubscription<TEvent, TEventHandler>();
_logger.LogInformation("Subscribed to event {EventName} with {EvenHandler}.", eventName, eventHandlerName);
}
public void Unsubscribe<TEvent, TEventHandler>() where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler<TEvent>
{
var eventName = _eventBusSubscriptionManager.GetEventIdentifier<TEvent>();
_logger.LogInformation("Unsubscribing from event {EventName}...", eventName);
_eventBusSubscriptionManager.RemoveSubscription<TEvent, TEventHandler>();
_logger.LogInformation("Unsubscribed from event {EventName}.", eventName);
}
private void ConfigureMessageBroker()
{
_consumerChannel = CreateConsumerChannel();
_eventBusSubscriptionManager.OnEventRemoved += SubscriptionManager_OnEventRemoved;
_persistentConnection.OnReconnectedAfterConnectionFailure +=
PersistentConnection_OnReconnectedAfterConnectionFailure;
}
private void PersistentConnection_OnReconnectedAfterConnectionFailure(object sender, EventArgs e)
{
DoCreateConsumerChannel();
RecreateSubscriptions();
}
private void RecreateSubscriptions()
{
var subscriptions = _eventBusSubscriptionManager.GetAllSubscriptions();
_eventBusSubscriptionManager.Clear();
var eventBusType = this.GetType();
foreach (var entry in subscriptions)
{
foreach (var genericSubscribe in entry.Value.Select(subscription => eventBusType.GetMethod("Subscribe")
.MakeGenericMethod(subscription.EventType, subscription.HandlerType)))
{
genericSubscribe.Invoke(this, null);
}
}
}
private void DoCreateConsumerChannel()
{
_consumerChannel.Dispose();
_consumerChannel = CreateConsumerChannel();
StartBasicConsume();
}
private void StartBasicConsume()
{
_logger.LogTrace("Starting RabbitMQ basic consume...");
if (_consumerChannel == null)
{
_logger.LogError("Could not start basic consume because consumer channel is null.");
return;
}
var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
consumer.Received += Consumer_Received;
_consumerChannel.BasicConsume
(
queue: _queueName,
autoAck: false,
consumer: consumer
);
_logger.LogTrace("Started RabbitMQ basic consume.");
}
private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
{
var eventName = eventArgs.RoutingKey;
var message = Encoding.UTF8.GetString(eventArgs.Body.Span);
var isAcknowledged = false;
try
{
await ProcessEvent(eventName, message);
_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
isAcknowledged = true;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error processing the following message: {Message}.", message);
}
finally
{
if (!isAcknowledged)
{
await TryEnqueueMessageAgainAsync(eventArgs);
}
}
}
private async Task TryEnqueueMessageAgainAsync(BasicDeliverEventArgs eventArgs)
{
try
{
_logger.LogWarning("Adding message to queue again with {Time} seconds delay...",
$"{_subscribeRetryTime.TotalSeconds:n1}");
await Task.Delay(_subscribeRetryTime);
_consumerChannel.BasicNack(eventArgs.DeliveryTag, false, true);
_logger.LogTrace("Message added to queue again.");
}
catch (Exception ex)
{
_logger.LogError("Could not enqueue message again: {Error}.", ex.Message);
}
}
private async Task ProcessEvent(string eventName, string message)
{
_logger.LogTrace("Processing RabbitMQ event: {EventName}...", eventName);
if (!_eventBusSubscriptionManager.HasSubscriptionsForEvent(eventName))
{
_logger.LogTrace("There are no subscriptions for this event.");
return;
}
var subscriptions = _eventBusSubscriptionManager.GetHandlersForEvent(eventName);
foreach (var subscription in subscriptions)
{
var handler = _serviceProvider.GetService(subscription.HandlerType);
if (handler == null)
{
_logger.LogWarning("There are no handlers for the following event: {EventName}", eventName);
continue;
}
var eventType = _eventBusSubscriptionManager.GetEventTypeByName(eventName);
var @event = JsonConvert.DeserializeObject(message, eventType);
var eventHandlerType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await Task.Yield();
await (Task)eventHandlerType.GetMethod(nameof(IIntegrationEventHandler<IntegrationEvent>.HandleAsync))
.Invoke(handler, [@event]);
}
_logger.LogTrace("Processed event {EventName}.", eventName);
}
private void SubscriptionManager_OnEventRemoved(object sender, string eventName)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using var channel = _persistentConnection.CreateModel();
channel.QueueUnbind(queue: _queueName, exchange: _exchangeName, routingKey: eventName);
if (_eventBusSubscriptionManager.IsEmpty)
{
_consumerChannel.Close();
}
}
private IModel CreateConsumerChannel()
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
_logger.LogTrace("Creating RabbitMQ consumer channel...");
var channel = _persistentConnection.CreateModel();
channel.ExchangeDeclare(exchange: _exchangeName, type: "direct");
channel.QueueDeclare
(
queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);
channel.CallbackException += (_, ea) =>
{
_logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel...");
DoCreateConsumerChannel();
};
_logger.LogTrace("Created RabbitMQ consumer channel.");
return channel;
}
private void AddQueueBindForEventSubscription(string eventName)
{
var containsKey = _eventBusSubscriptionManager.HasSubscriptionsForEvent(eventName);
if (containsKey)
{
return;
}
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using var channel = _persistentConnection.CreateModel();
channel.QueueBind(queue: _queueName, exchange: _exchangeName, routingKey: eventName);
}
}

@ -1,7 +1,6 @@
using System.Net.Sockets;
using System.Text;
using Polly;
using Polly.Retry;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
@ -10,58 +9,64 @@ namespace Infrastructure.EventBus.RabbitMQ;
public class RabbitMQPersistentConnection(
IConnectionFactory connectionFactory,
ILogger<RabbitMQPersistentConnection> logger,
int retryCount = 5)
: IRabbitMQPersistentConnection
int timeoutBeforeReconnecting = 15)
: IPersistentConnection
{
private IConnection? _connection;
private IConnection _connection;
private bool _disposed;
private object sync_root = new object();
public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed;
private readonly object _locker = new();
private bool _connectionFailed;
private readonly TimeSpan _timeoutBeforeReconnecting = TimeSpan.FromSeconds(timeoutBeforeReconnecting);
public event EventHandler? OnReconnectedAfterConnectionFailure;
public bool IsConnected => _connection is { IsOpen: true } && !_disposed;
public bool TryConnect()
{
logger.LogInformation("RabbitMQ Client is trying to connect");
logger.LogInformation("Trying to connect to RabbitMQ...");
lock (sync_root)
lock (_locker)
{
var policy = RetryPolicy.Handle<SocketException>()
// Creates a policy to retry connecting to message broker until it succeeds.
var policy = Policy
.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetry(retryCount,
retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
.WaitAndRetryForever((duration) => _timeoutBeforeReconnecting,
(ex, time) =>
{
logger.LogWarning(ex,
"RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})",
$"{time.TotalSeconds:n1}", ex.Message);
}
);
"RabbitMQ Client could not connect after {TimeOut} seconds ({ExceptionMessage}). Waiting to try again...",
$"{(int)time.TotalSeconds}", ex.Message);
});
policy.Execute(() =>
{
_connection = connectionFactory
.CreateConnection();
});
policy.Execute(() => { _connection = connectionFactory.CreateConnection(); });
if (IsConnected)
if (!IsConnected)
{
_connection.ConnectionShutdown += OnConnectionShutdown;
_connection.CallbackException += OnCallbackException;
_connection.ConnectionBlocked += OnConnectionBlocked;
logger.LogCritical("ERROR: could not connect to RabbitMQ.");
_connectionFailed = true;
return false;
}
// These event handlers hadle situations where the connection is lost by any reason. They try to reconnect the client.
_connection.ConnectionShutdown += OnConnectionShutdown;
_connection.CallbackException += OnCallbackException;
_connection.ConnectionBlocked += OnConnectionBlocked;
_connection.ConnectionUnblocked += OnConnectionUnblocked;
logger.LogInformation(
"RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events",
_connection.Endpoint.HostName);
logger.LogInformation(
"RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events",
_connection.Endpoint.HostName);
// If the connection has failed previously because of a RabbitMQ shutdown or something similar, we need to guarantee that the exchange and queues exist again.
// It's also necessary to rebind all application event handlers. We use this event handler below to do this.
if (!_connectionFailed)
{
return true;
}
else
{
logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");
return false;
}
OnReconnectedAfterConnectionFailure?.Invoke(this, null);
_connectionFailed = false;
return true;
}
}
@ -89,16 +94,13 @@ public class RabbitMQPersistentConnection(
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += new AsyncEventHandler<BasicDeliverEventArgs>(
async (a, b) =>
{
var Headers = b.BasicProperties.Headers;
var msgBody = b.Body.ToArray();
var message = Encoding.UTF8.GetString(msgBody);
await Task.CompletedTask;
Console.WriteLine("Received message: {0}", message);
}
);
consumer.Received += async (a, b) =>
{
var msgBody = b.Body.ToArray();
var message = Encoding.UTF8.GetString(msgBody);
await Task.CompletedTask;
Console.WriteLine("Received message: {0}", message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
@ -107,7 +109,10 @@ public class RabbitMQPersistentConnection(
public void Dispose()
{
if (_disposed) return;
if (_disposed)
{
return;
}
_disposed = true;
@ -123,37 +128,43 @@ public class RabbitMQPersistentConnection(
private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
{
if (_disposed)
{
return;
}
_connectionFailed = true;
logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");
TryConnect();
TryConnectIfNotDisposed();
}
private void OnCallbackException(object sender, CallbackExceptionEventArgs e)
{
if (_disposed)
{
return;
}
_connectionFailed = true;
logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");
TryConnect();
TryConnectIfNotDisposed();
}
private void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
{
_connectionFailed = true;
logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
TryConnectIfNotDisposed();
}
private void OnConnectionUnblocked(object sender, EventArgs args)
{
_connectionFailed = true;
logger.LogWarning("A RabbitMQ connection is unblocked. Trying to re-connect...");
TryConnectIfNotDisposed();
}
private void TryConnectIfNotDisposed()
{
if (_disposed)
{
logger.LogInformation("RabbitMQ client is disposed. No action will be taken.");
return;
}
logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
TryConnect();
}
}

@ -1,38 +0,0 @@
namespace Infrastructure.EventBus.RabbitMQ;
public class RabbitMqEventBus(
IRabbitMQPersistentConnection persistentConnection,
ILogger<RabbitMqEventBus> logger
) : IEventBus, IDisposable
{
public void Dispose()
{
// TODO release managed resources here
}
public void Publish(IntegrationEvent integrationEvent)
{
throw new NotImplementedException();
}
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
{
throw new NotImplementedException();
}
public void Unsubscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
{
throw new NotImplementedException();
}
public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
{
throw new NotImplementedException();
}
public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
{
throw new NotImplementedException();
}
}

@ -0,0 +1,28 @@
namespace Infrastructure.EventBus.Subscriptions;
public interface IEventBusSubscriptionManager
{
event EventHandler<string> OnEventRemoved;
bool IsEmpty { get; }
bool HasSubscriptionsForEvent(string eventName);
string GetEventIdentifier<TEvent>();
Type GetEventTypeByName(string eventName);
IEnumerable<Subscription> GetHandlersForEvent(string eventName);
Dictionary<string, List<Subscription>> GetAllSubscriptions();
void AddSubscription<TEvent, TEventHandler>()
where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler<TEvent>;
void RemoveSubscription<TEvent, TEventHandler>()
where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler<TEvent>;
void Clear();
}

@ -0,0 +1,109 @@
namespace Infrastructure.EventBus.Subscriptions;
public class InMemoryEventBusSubscriptionManager : IEventBusSubscriptionManager
{
private readonly Dictionary<string, List<Subscription>> _handlers = new();
private readonly List<Type> _eventTypes = [];
public event EventHandler<string> OnEventRemoved;
public string GetEventIdentifier<TEvent>() => typeof(TEvent).Name;
public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName);
public IEnumerable<Subscription> GetHandlersForEvent(string eventName) => _handlers[eventName];
public Dictionary<string, List<Subscription>> GetAllSubscriptions() => new(_handlers);
public void AddSubscription<TEvent, TEventHandler>()
where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler<TEvent>
{
var eventName = GetEventIdentifier<TEvent>();
DoAddSubscription(typeof(TEvent), typeof(TEventHandler), eventName);
if (!_eventTypes.Contains(typeof(TEvent)))
{
_eventTypes.Add(typeof(TEvent));
}
}
public void RemoveSubscription<TEvent, TEventHandler>()
where TEventHandler : IIntegrationEventHandler<TEvent>
where TEvent : IntegrationEvent
{
var handlerToRemove = FindSubscriptionToRemove<TEvent, TEventHandler>();
var eventName = GetEventIdentifier<TEvent>();
DoRemoveHandler(eventName, handlerToRemove);
}
public void Clear()
{
_handlers.Clear();
_eventTypes.Clear();
}
public bool IsEmpty => !_handlers.Keys.Any();
public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);
private void DoAddSubscription(Type eventType, Type handlerType, string eventName)
{
if (!HasSubscriptionsForEvent(eventName))
{
_handlers.Add(eventName, new List<Subscription>());
}
if (_handlers[eventName].Any(s => s.HandlerType == handlerType))
{
throw new ArgumentException($"Handler Type {handlerType.Name} already registered for '{eventName}'",
nameof(handlerType));
}
_handlers[eventName].Add(new Subscription(eventType, handlerType));
}
private void DoRemoveHandler(string eventName, Subscription subscriptionToRemove)
{
if (subscriptionToRemove == null)
{
return;
}
_handlers[eventName].Remove(subscriptionToRemove);
if (_handlers[eventName].Any())
{
return;
}
_handlers.Remove(eventName);
var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName);
if (eventType != null)
{
_eventTypes.Remove(eventType);
}
RaiseOnEventRemoved(eventName);
}
private void RaiseOnEventRemoved(string eventName)
{
var handler = OnEventRemoved;
handler?.Invoke(this, eventName);
}
private Subscription FindSubscriptionToRemove<TEvent, TEventHandler>()
where TEvent : IntegrationEvent
where TEventHandler : IIntegrationEventHandler<TEvent>
{
var eventName = GetEventIdentifier<TEvent>();
return DoFindSubscriptionToRemove(eventName, typeof(TEventHandler));
}
private Subscription DoFindSubscriptionToRemove(string eventName, Type handlerType)
{
return !HasSubscriptionsForEvent(eventName) ? null : _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType);
}
}

@ -0,0 +1,13 @@
namespace Infrastructure.EventBus.Subscriptions;
public class Subscription
{
public Type EventType { get; private set; }
public Type HandlerType { get; private set; }
public Subscription(Type eventType, Type handlerType)
{
EventType = eventType;
HandlerType = handlerType;
}
}
Loading…
Cancel
Save