|
|
@ -21,21 +21,21 @@ public class RabbitMQEventBus : IEventBus
|
|
|
|
|
|
|
|
|
|
|
|
private readonly ILogger<RabbitMQEventBus> _logger;
|
|
|
|
private readonly ILogger<RabbitMQEventBus> _logger;
|
|
|
|
|
|
|
|
|
|
|
|
private IModel _consumerChannel;
|
|
|
|
private IModel? _consumerChannel;
|
|
|
|
|
|
|
|
|
|
|
|
public RabbitMQEventBus(
|
|
|
|
public RabbitMQEventBus(
|
|
|
|
IPersistentConnection persistentConnection,
|
|
|
|
IPersistentConnection persistentConnection,
|
|
|
|
IEventBusSubscriptionManager subscriptionsManager,
|
|
|
|
IEventBusSubscriptionManager subscriptionsManager,
|
|
|
|
IServiceProvider serviceProvider,
|
|
|
|
IServiceProvider serviceProvider,
|
|
|
|
ILogger<RabbitMQEventBus> logger,
|
|
|
|
ILogger<RabbitMQEventBus> logger,
|
|
|
|
string exchangeName,
|
|
|
|
string brokerName,
|
|
|
|
string queueName)
|
|
|
|
string queueName)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
|
|
|
|
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
|
|
|
|
_subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(subscriptionsManager));
|
|
|
|
_subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(subscriptionsManager));
|
|
|
|
_serviceProvider = serviceProvider;
|
|
|
|
_serviceProvider = serviceProvider;
|
|
|
|
_logger = logger;
|
|
|
|
_logger = logger;
|
|
|
|
_exchangeName = exchangeName ?? throw new ArgumentNullException(nameof(exchangeName));
|
|
|
|
_exchangeName = brokerName ?? throw new ArgumentNullException(nameof(brokerName));
|
|
|
|
_queueName = queueName ?? throw new ArgumentNullException(nameof(queueName));
|
|
|
|
_queueName = queueName ?? throw new ArgumentNullException(nameof(queueName));
|
|
|
|
|
|
|
|
|
|
|
|
ConfigureMessageBroker();
|
|
|
|
ConfigureMessageBroker();
|
|
|
@ -68,7 +68,7 @@ public class RabbitMQEventBus : IEventBus
|
|
|
|
using var channel = _persistentConnection.CreateModel();
|
|
|
|
using var channel = _persistentConnection.CreateModel();
|
|
|
|
_logger.LogTrace("Declaring RabbitMQ exchange to publish event #{EventId}...", @event.Id);
|
|
|
|
_logger.LogTrace("Declaring RabbitMQ exchange to publish event #{EventId}...", @event.Id);
|
|
|
|
|
|
|
|
|
|
|
|
channel.ExchangeDeclare(exchange: _exchangeName, type: "direct");
|
|
|
|
channel.ExchangeDeclare(_exchangeName, ExchangeType.Direct);
|
|
|
|
|
|
|
|
|
|
|
|
var message = JsonSerializer.Serialize(@event);
|
|
|
|
var message = JsonSerializer.Serialize(@event);
|
|
|
|
var body = Encoding.UTF8.GetBytes(message);
|
|
|
|
var body = Encoding.UTF8.GetBytes(message);
|
|
|
@ -80,13 +80,7 @@ public class RabbitMQEventBus : IEventBus
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogTrace("Publishing event to RabbitMQ with ID #{EventId}...", @event.Id);
|
|
|
|
_logger.LogTrace("Publishing event to RabbitMQ with ID #{EventId}...", @event.Id);
|
|
|
|
|
|
|
|
|
|
|
|
channel.BasicPublish(
|
|
|
|
channel.BasicPublish(_exchangeName, eventName, true, properties, body);
|
|
|
|
exchange: _exchangeName,
|
|
|
|
|
|
|
|
routingKey: eventName,
|
|
|
|
|
|
|
|
mandatory: true,
|
|
|
|
|
|
|
|
basicProperties: properties,
|
|
|
|
|
|
|
|
body: body);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogTrace("Published event with ID #{EventId}.", @event.Id);
|
|
|
|
_logger.LogTrace("Published event with ID #{EventId}.", @event.Id);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -243,8 +237,8 @@ public class RabbitMQEventBus : IEventBus
|
|
|
|
var @event = JsonSerializer.Deserialize(message, eventType);
|
|
|
|
var @event = JsonSerializer.Deserialize(message, eventType);
|
|
|
|
var eventHandlerType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
|
|
|
|
var eventHandlerType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
|
|
|
|
await Task.Yield();
|
|
|
|
await Task.Yield();
|
|
|
|
await ((Task)eventHandlerType.GetMethod(nameof(IIntegrationEventHandler<IntegrationEvent>.HandleAsync))
|
|
|
|
await (Task)eventHandlerType.GetMethod(nameof(IIntegrationEventHandler<IntegrationEvent>.HandleAsync))
|
|
|
|
.Invoke(handler, [@event]))!;
|
|
|
|
.Invoke(handler, [@event]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogTrace("Processed event {EventName}.", eventName);
|
|
|
|
_logger.LogTrace("Processed event {EventName}.", eventName);
|
|
|
|