diff --git a/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs b/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs index b126d978..10b6ce25 100644 --- a/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs +++ b/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs @@ -28,7 +28,7 @@ protected override void Dispose(bool disposing) protected override void Build() { base.Build(); - AddInit(InitAsync()); + InitTaskList.Add(InitAsync(), CancellationToken); } private byte[] GetMessagePayload(Message message) @@ -94,20 +94,27 @@ void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor /// private async Task InitAsync() { - _logger.LogInformation("Ensuring client is authenticate"); - // Ensure the client finished the first authentication - await _clientProvider.EnsureClientAuthenticated(); + try + { + _logger.LogInformation("Ensuring client is authenticate"); + // Ensure the client finished the first authentication + await _clientProvider.EnsureClientAuthenticated(); + + // Provision the topology if enabled + if (ProviderSettings.TopologyProvisioning?.Enabled ?? false) + { + _logger.LogInformation("Provisioning topology"); + await ProvisionTopology(); + } - // Provision the topology if enabled - if (ProviderSettings.TopologyProvisioning?.Enabled ?? false) + // Read the Queue/Topic URLs for the producers + _logger.LogInformation("Populating queue URLs"); + await PopulatePathToUrlMappings(); + } + catch (Exception ex) { - _logger.LogInformation("Provisioning topology"); - await ProvisionTopology(); + _logger.LogError(ex, "SQS Transport initialization failed: {ErrorMessage}", ex.Message); } - - // Read the Queue/Topic URLs for the producers - _logger.LogInformation("Populating queue URLs"); - await PopulatePathToUrlMappings(); } public override async Task ProvisionTopology() diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs index 156efc4e..a0f425d9 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs @@ -59,7 +59,7 @@ protected override void Build() if (ProviderSettings.TopologyProvisioning?.Enabled ?? false) { - AddInit(ProvisionTopology()); + InitTaskList.Add(ProvisionTopology(), CancellationToken); } _client = ProviderSettings.ClientFactory(); diff --git a/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs b/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs index 79af07d8..6ff5462d 100644 --- a/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs +++ b/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs @@ -23,7 +23,7 @@ public NatsMessageBus(MessageBusSettings settings, NatsMessageBusSettings provid protected override void Build() { base.Build(); - AddInit(CreateConnectionAsync()); + InitTaskList.Add(CreateConnectionAsync(), CancellationToken); } private Task CreateConnectionAsync() diff --git a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs index e8207191..d8c887b1 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs @@ -28,7 +28,7 @@ protected override void Build() { base.Build(); - AddInit(CreateConnection()); + InitTaskList.Add(CreateConnection(), CancellationToken); } protected override async Task CreateConsumers() diff --git a/src/SlimMessageBus.Host.Sql/SqlMessageBus.cs b/src/SlimMessageBus.Host.Sql/SqlMessageBus.cs index 305bd7a8..cb506f27 100644 --- a/src/SlimMessageBus.Host.Sql/SqlMessageBus.cs +++ b/src/SlimMessageBus.Host.Sql/SqlMessageBus.cs @@ -11,7 +11,7 @@ protected override void Build() { base.Build(); - AddInit(ProvisionTopology()); + InitTaskList.Add(ProvisionTopology(), CancellationToken); } public override async Task ProvisionTopology() diff --git a/src/SlimMessageBus.Host/Collections/AsyncTaskList.cs b/src/SlimMessageBus.Host/Collections/AsyncTaskList.cs new file mode 100644 index 00000000..14b05846 --- /dev/null +++ b/src/SlimMessageBus.Host/Collections/AsyncTaskList.cs @@ -0,0 +1,48 @@ +namespace SlimMessageBus.Host.Collections; + +public interface IAsyncTaskList +{ + void Add(Task task, CancellationToken cancellationToken); + Task EnsureAllFinished(); +} + +/// +/// Tracks a list of Tasks that have to be awaited before we can proceed. +/// +public class AsyncTaskList : IAsyncTaskList +{ + private readonly object _initTaskLock = new(); + private Task _initTask = null; + + public void Add(Task task, CancellationToken cancellationToken) + { + lock (_initTaskLock) + { + var prevInitTask = _initTask; + _initTask = prevInitTask?.ContinueWith(_ => task, cancellationToken) ?? task; + } + } + + /// + /// Awaits (if any) bus intialization (e.g. topology provisining) before we can produce message into the bus. + /// + /// + public async Task EnsureAllFinished() + { + var initTask = _initTask; + if (initTask != null) + { + await initTask.ConfigureAwait(false); + + lock (_initTaskLock) + { + if (ReferenceEquals(_initTask, initTask)) + { + _initTask = null; + } + } + } + } + + +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs index 98c4abf7..f6886b35 100644 --- a/src/SlimMessageBus.Host/MessageBusBase.cs +++ b/src/SlimMessageBus.Host/MessageBusBase.cs @@ -65,8 +65,7 @@ public abstract class MessageBusBase : IDisposable, IAsyncDisposable, #endregion - private readonly object _initTaskLock = new(); - private Task _initTask = null; + protected readonly AsyncTaskList InitTaskList = new(); #region Start & Stop @@ -110,36 +109,6 @@ protected MessageBusBase(MessageBusSettings settings) PendingRequestStore = PendingRequestManager.Store; } - protected void AddInit(Task task) - { - lock (_initTaskLock) - { - var prevInitTask = _initTask; - _initTask = prevInitTask?.ContinueWith(_ => task, CancellationToken) ?? task; - } - } - - /// - /// Awaits (if any) bus intialization (e.g. topology provisining) before we can produce message into the bus. - /// - /// - protected async Task EnsureInitFinished() - { - var initTask = _initTask; - if (initTask != null) - { - lock (_initTaskLock) - { - if (ReferenceEquals(_initTask, initTask)) - { - _initTask = null; - } - } - - await initTask.ConfigureAwait(false); - } - } - protected virtual IMessageSerializer GetSerializer() => Settings.GetSerializer(Settings.ServiceProvider); protected virtual IMessageBusSettingsValidationService ValidationService { get => new DefaultMessageBusSettingsValidationService(Settings); } @@ -154,7 +123,7 @@ protected void OnBuildProvider() Build(); // Notify the bus has been created - before any message can be produced - AddInit(OnBusLifecycle(MessageBusLifecycleEventType.Created)); + InitTaskList.Add(OnBusLifecycle(MessageBusLifecycleEventType.Created), CancellationToken); // Auto start consumers if enabled if (Settings.AutoStartConsumers) @@ -222,7 +191,7 @@ public async Task Start() try { - await EnsureInitFinished(); + await InitTaskList.EnsureAllFinished(); _logger.LogInformation("Starting consumers for {BusName} bus...", Name); await OnBusLifecycle(MessageBusLifecycleEventType.Starting).ConfigureAwait(false); @@ -261,7 +230,7 @@ public async Task Stop() try { - await EnsureInitFinished(); + await InitTaskList.EnsureAllFinished(); _logger.LogInformation("Stopping consumers for {BusName} bus...", Name); await OnBusLifecycle(MessageBusLifecycleEventType.Stopping).ConfigureAwait(false); @@ -450,7 +419,7 @@ public async virtual Task ProducePublish(object message, string path = null, IDi { if (message == null) throw new ArgumentNullException(nameof(message)); AssertActive(); - await EnsureInitFinished(); + await InitTaskList.EnsureAllFinished(); // check if the cancellation was already requested cancellationToken.ThrowIfCancellationRequested(); @@ -548,8 +517,8 @@ public virtual async Task ProduceSend(object request, stri { if (request == null) throw new ArgumentNullException(nameof(request)); AssertActive(); - AssertRequestResponseConfigured(); - await EnsureInitFinished(); + AssertRequestResponseConfigured(); + await InitTaskList.EnsureAllFinished(); // check if the cancellation was already requested cancellationToken.ThrowIfCancellationRequested(); diff --git a/src/Tests/SlimMessageBus.Host.Test/Collections/AsyncTaskListTests.cs b/src/Tests/SlimMessageBus.Host.Test/Collections/AsyncTaskListTests.cs new file mode 100644 index 00000000..a9bfa7e4 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Test/Collections/AsyncTaskListTests.cs @@ -0,0 +1,37 @@ +namespace SlimMessageBus.Host.Test.Collections; + +using System.Collections.Concurrent; + +using SlimMessageBus.Host.Collections; + +public class AsyncTaskListTests +{ + private readonly AsyncTaskList _sut = new(); + + [Fact] + public async Task Given_TaskAdded_When_EnsureAllFinished_Then_TaskIsCompleted() + { + // arrange + var numberList = new ConcurrentQueue(); + + async Task RunTask(int n) + { + numberList.Enqueue(n); + await Task.Delay(100); + } + + _sut.Add(RunTask(1), CancellationToken.None); + _sut.Add(RunTask(2), CancellationToken.None); + + // act + await _sut.EnsureAllFinished(); + + // assert + numberList.Should().HaveCount(2); + numberList.TryDequeue(out var n1).Should().BeTrue(); + n1.Should().Be(1); + numberList.TryDequeue(out var n2).Should().BeTrue(); + n2.Should().Be(2); + } + +}