added event bus

master
Young 7 months ago
parent c6a36c055d
commit f2e12b7f7e

@ -0,0 +1,5 @@
// Global using directives
global using Microsoft.Extensions.Logging;
global using Newtonsoft.Json;
global using RabbitMQ.Client;

@ -0,0 +1,6 @@
namespace Infrastructure.EventBus;
public interface IDynamicIntegrationEventHandler
{
Task Handle(dynamic eventData);
}

@ -0,0 +1,14 @@
namespace Infrastructure.EventBus;
public interface IEventBus
{
void Publish(IntegrationEvent integrationEvent);
void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>;
void Unsubscribe<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent;
void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler;
void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler;
}

@ -0,0 +1,11 @@
namespace Infrastructure.EventBus;
public interface IIntegrationEventHandler
{
}
public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler
where TIntegrationEvent : IntegrationEvent
{
Task Handle(TIntegrationEvent integrationEvent);
}

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3"/>
<PackageReference Include="RabbitMQ.Client" Version="6.8.1"/>
<PackageReference Include="Polly" Version="8.2.0"/>
</ItemGroup>
</Project>

@ -0,0 +1,13 @@
namespace Infrastructure.EventBus;
[method: JsonConstructor]
public class IntegrationEvent(Guid id, DateTime createdDate)
{
public IntegrationEvent() : this(Guid.NewGuid(), DateTime.UtcNow)
{
}
[JsonProperty] public Guid Id { get; private set; } = id;
[JsonProperty] public DateTime CreatedDate { get; private set; } = createdDate;
}

@ -0,0 +1,14 @@
namespace Infrastructure.EventBus.RabbitMQ;
public interface IRabbitMQPersistentConnection : IDisposable
{
bool IsConnected { get; }
bool TryConnect();
IModel CreateModel();
void PublishMessage(string message, string exchangeName, string routingKey);
void StartConsuming(string queueName);
}

@ -0,0 +1,159 @@
using System.Net.Sockets;
using System.Text;
using Polly;
using Polly.Retry;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
namespace Infrastructure.EventBus.RabbitMQ;
public class RabbitMQPersistentConnection(
IConnectionFactory connectionFactory,
ILogger<RabbitMQPersistentConnection> logger,
int retryCount = 5)
: IRabbitMQPersistentConnection
{
private IConnection? _connection;
private bool _disposed;
private object sync_root = new object();
public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed;
public bool TryConnect()
{
logger.LogInformation("RabbitMQ Client is trying to connect");
lock (sync_root)
{
var policy = RetryPolicy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetry(retryCount,
retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(ex, time) =>
{
logger.LogWarning(ex,
"RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})",
$"{time.TotalSeconds:n1}", ex.Message);
}
);
policy.Execute(() =>
{
_connection = connectionFactory
.CreateConnection();
});
if (IsConnected)
{
_connection.ConnectionShutdown += OnConnectionShutdown;
_connection.CallbackException += OnCallbackException;
_connection.ConnectionBlocked += OnConnectionBlocked;
logger.LogInformation(
"RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events",
_connection.Endpoint.HostName);
return true;
}
else
{
logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");
return false;
}
}
}
public IModel CreateModel()
{
if (!IsConnected)
{
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
}
return _connection.CreateModel();
}
public void PublishMessage(string message, string exchangeName, string routingKey)
{
using var channel = CreateModel();
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, true);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
}
public void StartConsuming(string queueName)
{
using var channel = CreateModel();
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += new AsyncEventHandler<BasicDeliverEventArgs>(
async (a, b) =>
{
var Headers = b.BasicProperties.Headers;
var msgBody = b.Body.ToArray();
var message = Encoding.UTF8.GetString(msgBody);
await Task.CompletedTask;
Console.WriteLine("Received message: {0}", message);
}
);
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("Consuming messages...");
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
_connection.Dispose();
}
catch (IOException ex)
{
logger.LogCritical(ex.ToString());
}
}
private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
{
if (_disposed)
{
return;
}
logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");
TryConnect();
}
private void OnCallbackException(object sender, CallbackExceptionEventArgs e)
{
if (_disposed)
{
return;
}
logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");
TryConnect();
}
private void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
{
if (_disposed)
{
return;
}
logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
TryConnect();
}
}

@ -0,0 +1,38 @@
namespace Infrastructure.EventBus.RabbitMQ;
public class RabbitMqEventBus(
IRabbitMQPersistentConnection persistentConnection,
ILogger<RabbitMqEventBus> logger
) : IEventBus, IDisposable
{
public void Dispose()
{
// TODO release managed resources here
}
public void Publish(IntegrationEvent integrationEvent)
{
throw new NotImplementedException();
}
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
{
throw new NotImplementedException();
}
public void Unsubscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
{
throw new NotImplementedException();
}
public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
{
throw new NotImplementedException();
}
public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
{
throw new NotImplementedException();
}
}

@ -7,6 +7,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Infrastructure", "Infrastru
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Infrastructure.Tests", "Infrastructure.Tests\Infrastructure.Tests.csproj", "{47915B09-D98E-4BA2-8B07-4C18056A7191}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Infrastructure.EventBus", "Infrastructure.EventBus\Infrastructure.EventBus.csproj", "{958347CA-1726-4F3C-B701-5B9CC6603235}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -21,6 +23,10 @@ Global
{47915B09-D98E-4BA2-8B07-4C18056A7191}.Debug|Any CPU.Build.0 = Debug|Any CPU
{47915B09-D98E-4BA2-8B07-4C18056A7191}.Release|Any CPU.ActiveCfg = Release|Any CPU
{47915B09-D98E-4BA2-8B07-4C18056A7191}.Release|Any CPU.Build.0 = Release|Any CPU
{958347CA-1726-4F3C-B701-5B9CC6603235}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{958347CA-1726-4F3C-B701-5B9CC6603235}.Debug|Any CPU.Build.0 = Debug|Any CPU
{958347CA-1726-4F3C-B701-5B9CC6603235}.Release|Any CPU.ActiveCfg = Release|Any CPU
{958347CA-1726-4F3C-B701-5B9CC6603235}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

Loading…
Cancel
Save