Skip to content

Commit

Permalink
AmazonSQS transport
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Nov 17, 2024
1 parent 41ade08 commit 3fe4e68
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 54 deletions.
31 changes: 19 additions & 12 deletions src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protected override void Dispose(bool disposing)
protected override void Build()
{
base.Build();
AddInit(InitAsync());
InitTaskList.Add(InitAsync(), CancellationToken);
}

private byte[] GetMessagePayload(Message message)
Expand Down Expand Up @@ -94,20 +94,27 @@ void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor<Message>
/// <returns></returns>
private async Task InitAsync()
{
_logger.LogInformation("Ensuring client is authenticate");
// Ensure the client finished the first authentication
await _clientProvider.EnsureClientAuthenticated();
try
{
_logger.LogInformation("Ensuring client is authenticate");
// Ensure the client finished the first authentication
await _clientProvider.EnsureClientAuthenticated();

// Provision the topology if enabled
if (ProviderSettings.TopologyProvisioning?.Enabled ?? false)
{
_logger.LogInformation("Provisioning topology");
await ProvisionTopology();
}

// Provision the topology if enabled
if (ProviderSettings.TopologyProvisioning?.Enabled ?? false)
// Read the Queue/Topic URLs for the producers
_logger.LogInformation("Populating queue URLs");
await PopulatePathToUrlMappings();
}
catch (Exception ex)
{
_logger.LogInformation("Provisioning topology");
await ProvisionTopology();
_logger.LogError(ex, "SQS Transport initialization failed: {ErrorMessage}", ex.Message);
}

// Read the Queue/Topic URLs for the producers
_logger.LogInformation("Populating queue URLs");
await PopulatePathToUrlMappings();
}

public override async Task ProvisionTopology()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected override void Build()

if (ProviderSettings.TopologyProvisioning?.Enabled ?? false)
{
AddInit(ProvisionTopology());
InitTaskList.Add(ProvisionTopology(), CancellationToken);
}

_client = ProviderSettings.ClientFactory();
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Nats/NatsMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public NatsMessageBus(MessageBusSettings settings, NatsMessageBusSettings provid
protected override void Build()
{
base.Build();
AddInit(CreateConnectionAsync());
InitTaskList.Add(CreateConnectionAsync(), CancellationToken);
}

private Task CreateConnectionAsync()
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protected override void Build()
{
base.Build();

AddInit(CreateConnection());
InitTaskList.Add(CreateConnection(), CancellationToken);
}

protected override async Task CreateConsumers()
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Sql/SqlMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ protected override void Build()
{
base.Build();

AddInit(ProvisionTopology());
InitTaskList.Add(ProvisionTopology(), CancellationToken);
}

public override async Task ProvisionTopology()
Expand Down
48 changes: 48 additions & 0 deletions src/SlimMessageBus.Host/Collections/AsyncTaskList.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
namespace SlimMessageBus.Host.Collections;

public interface IAsyncTaskList
{
void Add(Task task, CancellationToken cancellationToken);
Task EnsureAllFinished();
}

/// <summary>
/// Tracks a list of Tasks that have to be awaited before we can proceed.
/// </summary>
public class AsyncTaskList : IAsyncTaskList
{
private readonly object _initTaskLock = new();
private Task _initTask = null;

public void Add(Task task, CancellationToken cancellationToken)
{
lock (_initTaskLock)
{
var prevInitTask = _initTask;
_initTask = prevInitTask?.ContinueWith(_ => task, cancellationToken) ?? task;
}
}

/// <summary>
/// Awaits (if any) bus intialization (e.g. topology provisining) before we can produce message into the bus.
/// </summary>
/// <returns></returns>
public async Task EnsureAllFinished()
{
var initTask = _initTask;
if (initTask != null)
{
await initTask.ConfigureAwait(false);

lock (_initTaskLock)
{
if (ReferenceEquals(_initTask, initTask))
{
_initTask = null;
}
}
}
}


}
45 changes: 7 additions & 38 deletions src/SlimMessageBus.Host/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ public abstract class MessageBusBase : IDisposable, IAsyncDisposable,

#endregion

private readonly object _initTaskLock = new();
private Task _initTask = null;
protected readonly AsyncTaskList InitTaskList = new();

#region Start & Stop

Expand Down Expand Up @@ -110,36 +109,6 @@ protected MessageBusBase(MessageBusSettings settings)
PendingRequestStore = PendingRequestManager.Store;
}

protected void AddInit(Task task)
{
lock (_initTaskLock)
{
var prevInitTask = _initTask;
_initTask = prevInitTask?.ContinueWith(_ => task, CancellationToken) ?? task;
}
}

/// <summary>
/// Awaits (if any) bus intialization (e.g. topology provisining) before we can produce message into the bus.
/// </summary>
/// <returns></returns>
protected async Task EnsureInitFinished()
{
var initTask = _initTask;
if (initTask != null)
{
lock (_initTaskLock)
{
if (ReferenceEquals(_initTask, initTask))
{
_initTask = null;
}
}

await initTask.ConfigureAwait(false);
}
}

protected virtual IMessageSerializer GetSerializer() => Settings.GetSerializer(Settings.ServiceProvider);

protected virtual IMessageBusSettingsValidationService ValidationService { get => new DefaultMessageBusSettingsValidationService(Settings); }
Expand All @@ -154,7 +123,7 @@ protected void OnBuildProvider()
Build();

// Notify the bus has been created - before any message can be produced
AddInit(OnBusLifecycle(MessageBusLifecycleEventType.Created));
InitTaskList.Add(OnBusLifecycle(MessageBusLifecycleEventType.Created), CancellationToken);

// Auto start consumers if enabled
if (Settings.AutoStartConsumers)
Expand Down Expand Up @@ -222,7 +191,7 @@ public async Task Start()

try
{
await EnsureInitFinished();
await InitTaskList.EnsureAllFinished();

_logger.LogInformation("Starting consumers for {BusName} bus...", Name);
await OnBusLifecycle(MessageBusLifecycleEventType.Starting).ConfigureAwait(false);
Expand Down Expand Up @@ -261,7 +230,7 @@ public async Task Stop()

try
{
await EnsureInitFinished();
await InitTaskList.EnsureAllFinished();

_logger.LogInformation("Stopping consumers for {BusName} bus...", Name);
await OnBusLifecycle(MessageBusLifecycleEventType.Stopping).ConfigureAwait(false);
Expand Down Expand Up @@ -450,7 +419,7 @@ public async virtual Task ProducePublish(object message, string path = null, IDi
{
if (message == null) throw new ArgumentNullException(nameof(message));
AssertActive();
await EnsureInitFinished();
await InitTaskList.EnsureAllFinished();

// check if the cancellation was already requested
cancellationToken.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -548,8 +517,8 @@ public virtual async Task<TResponse> ProduceSend<TResponse>(object request, stri
{
if (request == null) throw new ArgumentNullException(nameof(request));
AssertActive();
AssertRequestResponseConfigured();
await EnsureInitFinished();
AssertRequestResponseConfigured();
await InitTaskList.EnsureAllFinished();

// check if the cancellation was already requested
cancellationToken.ThrowIfCancellationRequested();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
namespace SlimMessageBus.Host.Test.Collections;

using System.Collections.Concurrent;

using SlimMessageBus.Host.Collections;

public class AsyncTaskListTests
{
private readonly AsyncTaskList _sut = new();

[Fact]
public async Task Given_TaskAdded_When_EnsureAllFinished_Then_TaskIsCompleted()
{
// arrange
var numberList = new ConcurrentQueue<int>();

async Task RunTask(int n)
{
numberList.Enqueue(n);
await Task.Delay(100);
}

_sut.Add(RunTask(1), CancellationToken.None);
_sut.Add(RunTask(2), CancellationToken.None);

// act
await _sut.EnsureAllFinished();

// assert
numberList.Should().HaveCount(2);
numberList.TryDequeue(out var n1).Should().BeTrue();
n1.Should().Be(1);
numberList.TryDequeue(out var n2).Should().BeTrue();
n2.Should().Be(2);
}

}

0 comments on commit 3fe4e68

Please sign in to comment.