|
|
@ -28,14 +28,14 @@ public class RabbitMQEventBus : IEventBus
|
|
|
|
IEventBusSubscriptionManager subscriptionsManager,
|
|
|
|
IEventBusSubscriptionManager subscriptionsManager,
|
|
|
|
IServiceProvider serviceProvider,
|
|
|
|
IServiceProvider serviceProvider,
|
|
|
|
ILogger<RabbitMQEventBus> logger,
|
|
|
|
ILogger<RabbitMQEventBus> logger,
|
|
|
|
string brokerName,
|
|
|
|
string exchangeName,
|
|
|
|
string queueName)
|
|
|
|
string queueName)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
|
|
|
|
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
|
|
|
|
_subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(subscriptionsManager));
|
|
|
|
_subscriptionsManager = subscriptionsManager ?? throw new ArgumentNullException(nameof(subscriptionsManager));
|
|
|
|
_serviceProvider = serviceProvider;
|
|
|
|
_serviceProvider = serviceProvider;
|
|
|
|
_logger = logger;
|
|
|
|
_logger = logger;
|
|
|
|
_exchangeName = brokerName ?? throw new ArgumentNullException(nameof(brokerName));
|
|
|
|
_exchangeName = exchangeName ?? throw new ArgumentNullException(nameof(exchangeName));
|
|
|
|
_queueName = queueName ?? throw new ArgumentNullException(nameof(queueName));
|
|
|
|
_queueName = queueName ?? throw new ArgumentNullException(nameof(queueName));
|
|
|
|
|
|
|
|
|
|
|
|
ConfigureMessageBroker();
|
|
|
|
ConfigureMessageBroker();
|
|
|
@ -140,15 +140,8 @@ public class RabbitMQEventBus : IEventBus
|
|
|
|
|
|
|
|
|
|
|
|
var channel = _persistentConnection.CreateModel();
|
|
|
|
var channel = _persistentConnection.CreateModel();
|
|
|
|
|
|
|
|
|
|
|
|
channel.ExchangeDeclare(exchange: _exchangeName, type: "direct");
|
|
|
|
channel.ExchangeDeclare(_exchangeName, ExchangeType.Direct);
|
|
|
|
channel.QueueDeclare
|
|
|
|
channel.QueueDeclare(_queueName, true, false, false, null);
|
|
|
|
(
|
|
|
|
|
|
|
|
queue: _queueName,
|
|
|
|
|
|
|
|
durable: true,
|
|
|
|
|
|
|
|
exclusive: false,
|
|
|
|
|
|
|
|
autoDelete: false,
|
|
|
|
|
|
|
|
arguments: null
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
channel.CallbackException += (sender, ea) =>
|
|
|
|
channel.CallbackException += (sender, ea) =>
|
|
|
|
{
|
|
|
|
{
|
|
|
@ -175,12 +168,7 @@ public class RabbitMQEventBus : IEventBus
|
|
|
|
var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
|
|
|
|
var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
|
|
|
|
consumer.Received += Consumer_Received;
|
|
|
|
consumer.Received += Consumer_Received;
|
|
|
|
|
|
|
|
|
|
|
|
_consumerChannel.BasicConsume
|
|
|
|
_consumerChannel.BasicConsume(_queueName, false, consumer);
|
|
|
|
(
|
|
|
|
|
|
|
|
queue: _queueName,
|
|
|
|
|
|
|
|
autoAck: false,
|
|
|
|
|
|
|
|
consumer: consumer
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogTrace("Started RabbitMQ basic consume.");
|
|
|
|
_logger.LogTrace("Started RabbitMQ basic consume.");
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -270,7 +258,7 @@ public class RabbitMQEventBus : IEventBus
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
using var channel = _persistentConnection.CreateModel();
|
|
|
|
using var channel = _persistentConnection.CreateModel();
|
|
|
|
channel.QueueUnbind(queue: _queueName, exchange: _exchangeName, routingKey: eventName);
|
|
|
|
channel.QueueUnbind(_queueName, _exchangeName, eventName);
|
|
|
|
|
|
|
|
|
|
|
|
if (_subscriptionsManager.IsEmpty)
|
|
|
|
if (_subscriptionsManager.IsEmpty)
|
|
|
|
{
|
|
|
|
{
|
|
|
@ -292,7 +280,7 @@ public class RabbitMQEventBus : IEventBus
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
using var channel = _persistentConnection.CreateModel();
|
|
|
|
using var channel = _persistentConnection.CreateModel();
|
|
|
|
channel.QueueBind(queue: _queueName, exchange: _exchangeName, routingKey: eventName);
|
|
|
|
channel.QueueBind(_queueName, _exchangeName, eventName);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void PersistentConnection_OnReconnectedAfterConnectionFailure(object sender, EventArgs e)
|
|
|
|
private void PersistentConnection_OnReconnectedAfterConnectionFailure(object sender, EventArgs e)
|
|
|
|