diff --git a/src/Infrastructure.EventBus/GlobalUsings.cs b/src/Infrastructure.EventBus/GlobalUsings.cs new file mode 100644 index 0000000..fdab710 --- /dev/null +++ b/src/Infrastructure.EventBus/GlobalUsings.cs @@ -0,0 +1,5 @@ +// Global using directives + +global using Microsoft.Extensions.Logging; +global using Newtonsoft.Json; +global using RabbitMQ.Client; \ No newline at end of file diff --git a/src/Infrastructure.EventBus/IDynamicIntegrationEventHandler.cs b/src/Infrastructure.EventBus/IDynamicIntegrationEventHandler.cs new file mode 100644 index 0000000..260fc2f --- /dev/null +++ b/src/Infrastructure.EventBus/IDynamicIntegrationEventHandler.cs @@ -0,0 +1,6 @@ +namespace Infrastructure.EventBus; + +public interface IDynamicIntegrationEventHandler +{ + Task Handle(dynamic eventData); +} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/IEventBus.cs b/src/Infrastructure.EventBus/IEventBus.cs new file mode 100644 index 0000000..0d9d013 --- /dev/null +++ b/src/Infrastructure.EventBus/IEventBus.cs @@ -0,0 +1,14 @@ +namespace Infrastructure.EventBus; + +public interface IEventBus +{ + void Publish(IntegrationEvent integrationEvent); + + void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler; + + void Unsubscribe() where TH : IIntegrationEventHandler where T : IntegrationEvent; + + void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler; + + void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler; +} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/IIntegrationEventHandler.cs b/src/Infrastructure.EventBus/IIntegrationEventHandler.cs new file mode 100644 index 0000000..ebf8228 --- /dev/null +++ b/src/Infrastructure.EventBus/IIntegrationEventHandler.cs @@ -0,0 +1,11 @@ +namespace Infrastructure.EventBus; + +public interface IIntegrationEventHandler +{ +} + +public interface IIntegrationEventHandler : IIntegrationEventHandler + where TIntegrationEvent : IntegrationEvent +{ + Task Handle(TIntegrationEvent integrationEvent); +} diff --git a/src/Infrastructure.EventBus/Infrastructure.EventBus.csproj b/src/Infrastructure.EventBus/Infrastructure.EventBus.csproj new file mode 100644 index 0000000..6f5cb7e --- /dev/null +++ b/src/Infrastructure.EventBus/Infrastructure.EventBus.csproj @@ -0,0 +1,19 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + + + diff --git a/src/Infrastructure.EventBus/IntegrationEvent.cs b/src/Infrastructure.EventBus/IntegrationEvent.cs new file mode 100644 index 0000000..91dbe82 --- /dev/null +++ b/src/Infrastructure.EventBus/IntegrationEvent.cs @@ -0,0 +1,13 @@ +namespace Infrastructure.EventBus; + +[method: JsonConstructor] +public class IntegrationEvent(Guid id, DateTime createdDate) +{ + public IntegrationEvent() : this(Guid.NewGuid(), DateTime.UtcNow) + { + } + + [JsonProperty] public Guid Id { get; private set; } = id; + + [JsonProperty] public DateTime CreatedDate { get; private set; } = createdDate; +} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/RabbitMQ/IRabbitMQPersistentConnection.cs b/src/Infrastructure.EventBus/RabbitMQ/IRabbitMQPersistentConnection.cs new file mode 100644 index 0000000..50e7b7e --- /dev/null +++ b/src/Infrastructure.EventBus/RabbitMQ/IRabbitMQPersistentConnection.cs @@ -0,0 +1,14 @@ +namespace Infrastructure.EventBus.RabbitMQ; + +public interface IRabbitMQPersistentConnection : IDisposable +{ + bool IsConnected { get; } + + bool TryConnect(); + + IModel CreateModel(); + + void PublishMessage(string message, string exchangeName, string routingKey); + + void StartConsuming(string queueName); +} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/RabbitMQ/RabbitMQPersistentConnection.cs b/src/Infrastructure.EventBus/RabbitMQ/RabbitMQPersistentConnection.cs new file mode 100644 index 0000000..ecd49f0 --- /dev/null +++ b/src/Infrastructure.EventBus/RabbitMQ/RabbitMQPersistentConnection.cs @@ -0,0 +1,159 @@ +using System.Net.Sockets; +using System.Text; +using Polly; +using Polly.Retry; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; + +namespace Infrastructure.EventBus.RabbitMQ; + +public class RabbitMQPersistentConnection( + IConnectionFactory connectionFactory, + ILogger logger, + int retryCount = 5) + : IRabbitMQPersistentConnection +{ + private IConnection? _connection; + private bool _disposed; + private object sync_root = new object(); + + public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed; + + public bool TryConnect() + { + logger.LogInformation("RabbitMQ Client is trying to connect"); + + lock (sync_root) + { + var policy = RetryPolicy.Handle() + .Or() + .WaitAndRetry(retryCount, + retryAttempt => + TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + (ex, time) => + { + logger.LogWarning(ex, + "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", + $"{time.TotalSeconds:n1}", ex.Message); + } + ); + + policy.Execute(() => + { + _connection = connectionFactory + .CreateConnection(); + }); + + if (IsConnected) + { + _connection.ConnectionShutdown += OnConnectionShutdown; + _connection.CallbackException += OnCallbackException; + _connection.ConnectionBlocked += OnConnectionBlocked; + + logger.LogInformation( + "RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", + _connection.Endpoint.HostName); + + return true; + } + else + { + logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); + + return false; + } + } + } + + public IModel CreateModel() + { + if (!IsConnected) + { + throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); + } + + return _connection.CreateModel(); + } + + public void PublishMessage(string message, string exchangeName, string routingKey) + { + using var channel = CreateModel(); + channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, true); + var body = Encoding.UTF8.GetBytes(message); + channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body); + } + + public void StartConsuming(string queueName) + { + using var channel = CreateModel(); + channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); + + var consumer = new AsyncEventingBasicConsumer(channel); + consumer.Received += new AsyncEventHandler( + async (a, b) => + { + var Headers = b.BasicProperties.Headers; + var msgBody = b.Body.ToArray(); + var message = Encoding.UTF8.GetString(msgBody); + await Task.CompletedTask; + Console.WriteLine("Received message: {0}", message); + } + ); + + channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); + + Console.WriteLine("Consuming messages..."); + } + + public void Dispose() + { + if (_disposed) return; + + _disposed = true; + + try + { + _connection.Dispose(); + } + catch (IOException ex) + { + logger.LogCritical(ex.ToString()); + } + } + + private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) + { + if (_disposed) + { + return; + } + + logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); + + TryConnect(); + } + + private void OnCallbackException(object sender, CallbackExceptionEventArgs e) + { + if (_disposed) + { + return; + } + + logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); + + TryConnect(); + } + + private void OnConnectionShutdown(object sender, ShutdownEventArgs reason) + { + if (_disposed) + { + return; + } + + logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); + + TryConnect(); + } +} \ No newline at end of file diff --git a/src/Infrastructure.EventBus/RabbitMQ/RabbitMqEventBus.cs b/src/Infrastructure.EventBus/RabbitMQ/RabbitMqEventBus.cs new file mode 100644 index 0000000..4b60cf9 --- /dev/null +++ b/src/Infrastructure.EventBus/RabbitMQ/RabbitMqEventBus.cs @@ -0,0 +1,38 @@ +namespace Infrastructure.EventBus.RabbitMQ; + +public class RabbitMqEventBus( + IRabbitMQPersistentConnection persistentConnection, + ILogger logger + + ) : IEventBus, IDisposable +{ + public void Dispose() + { + // TODO release managed resources here + } + + public void Publish(IntegrationEvent integrationEvent) + { + throw new NotImplementedException(); + } + + public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler + { + throw new NotImplementedException(); + } + + public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler + { + throw new NotImplementedException(); + } + + public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler + { + throw new NotImplementedException(); + } + + public void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler + { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/src/Infrastructure.sln b/src/Infrastructure.sln index e11dcac..d674b86 100644 --- a/src/Infrastructure.sln +++ b/src/Infrastructure.sln @@ -7,6 +7,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Infrastructure", "Infrastru EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Infrastructure.Tests", "Infrastructure.Tests\Infrastructure.Tests.csproj", "{47915B09-D98E-4BA2-8B07-4C18056A7191}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Infrastructure.EventBus", "Infrastructure.EventBus\Infrastructure.EventBus.csproj", "{958347CA-1726-4F3C-B701-5B9CC6603235}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -21,6 +23,10 @@ Global {47915B09-D98E-4BA2-8B07-4C18056A7191}.Debug|Any CPU.Build.0 = Debug|Any CPU {47915B09-D98E-4BA2-8B07-4C18056A7191}.Release|Any CPU.ActiveCfg = Release|Any CPU {47915B09-D98E-4BA2-8B07-4C18056A7191}.Release|Any CPU.Build.0 = Release|Any CPU + {958347CA-1726-4F3C-B701-5B9CC6603235}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {958347CA-1726-4F3C-B701-5B9CC6603235}.Debug|Any CPU.Build.0 = Debug|Any CPU + {958347CA-1726-4F3C-B701-5B9CC6603235}.Release|Any CPU.ActiveCfg = Release|Any CPU + {958347CA-1726-4F3C-B701-5B9CC6603235}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE