Skip to content

Commit

Permalink
Amazon.SQS transport
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Nov 24, 2024
1 parent 1e78a38 commit 6479974
Show file tree
Hide file tree
Showing 82 changed files with 2,948 additions and 254 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ jobs:
--verbosity normal \
--logger "trx;LogFilePrefix=Integration" \
--collect:"XPlat Code Coverage;Format=opencover" \
--filter "Category=Integration&Transport=Outbox"
--filter "Category=Integration"
working-directory: ./src
env:
# Connects to the Azure cloud
Expand All @@ -111,6 +111,10 @@ jobs:
azure_eventhub_connectionstring: ${{ secrets.azure_eventhub_connectionstring }}
azure_storagecontainer_connectionstring: ${{ secrets.azure_storagecontainer_connectionstring }}

# Connects to AWS cloud
amazon_access_key: ${{ secrets.amazon_access_key }}
amazon_secret_access_key: ${{ secrets.amazon_secret_access_key }}

_kafka_brokers: ${{ secrets.kafka_brokers }}
_kafka_username: ${{ secrets.kafka_username }}
_kafka_password: ${{ secrets.kafka_password }}
Expand Down
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i
[![Vulnerabilities](https://sonarcloud.io/api/project_badges/measure?project=zarusz_SlimMessageBus&metric=vulnerabilities)](https://sonarcloud.io/summary/overall?id=zarusz_SlimMessageBus)
[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=zarusz_SlimMessageBus&metric=alert_status)](https://sonarcloud.io/summary/overall?id=zarusz_SlimMessageBus)

> The v2 release is available (see [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0)).
> The v3 release is [under construction](https://github.com/zarusz/SlimMessageBus/tree/release/v3).
> The v2 release is available (see [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0)).
- [Key elements of SlimMessageBus](#key-elements-of-slimmessagebus)
- [Docs](#docs)
Expand Down Expand Up @@ -47,6 +47,7 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i

- [Introduction](docs/intro.md)
- Providers:
- [Amazon SQS/SNS](docs/provider_amazon_sqs.md)
- [Apache Kafka](docs/provider_kafka.md)
- [Azure EventHubs](docs/provider_azure_eventhubs.md)
- [Azure ServiceBus](docs/provider_azure_servicebus.md)
Expand All @@ -69,11 +70,12 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i
| ------------------------------------ | ------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `SlimMessageBus` | The core API for SlimMessageBus | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.svg)](https://www.nuget.org/packages/SlimMessageBus) |
| **Transport providers** | | |
| `.Host.AmazonSQS` | Transport provider for Amazon SQS / SNS | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AmazonSQS.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AmazonSQS) |
| `.Host.AzureEventHub` | Transport provider for Azure Event Hubs | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AzureEventHub.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AzureEventHub) |
| `.Host.AzureServiceBus` | Transport provider for Azure Service Bus | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AzureServiceBus.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AzureServiceBus) |
| `.Host.Kafka` | Transport provider for Apache Kafka | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Kafka.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Kafka) |
| `.Host.Memory` | Transport provider implementation for in-process (in memory) message passing (no messaging infrastructure required) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Memory.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Memory) |
| `.Host.MQTT` | Transport provider for MQTT | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.MQTT.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.MQTT) |
| `.Host.Memory` | Transport provider implementation for in-process (in memory) message passing (no messaging infrastructure required) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Memory.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Memory) |
| `.Host.NATS` | Transport provider for [NATS](https://nats.io/) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.NATS.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.NATS) |
| `.Host.RabbitMQ` | Transport provider for RabbitMQ | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.RabbitMQ.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.RabbitMQ) |
| `.Host.Redis` | Transport provider for Redis | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Redis.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Redis) |
Expand Down Expand Up @@ -186,7 +188,7 @@ services.AddSlimMessageBus(mbb =>
// Scan assembly for consumers, handlers, interceptors, and register into MSDI
.AddServicesFromAssemblyContaining<SomeMessageConsumer>()
//.AddServicesFromAssembly(Assembly.GetExecutingAssembly());
//.AddServicesFromAssembly(Assembly.GetExecutingAssembly())
// Add JSON serializer
.AddJsonSerializer(); // requires SlimMessageBus.Host.Serialization.Json or SlimMessageBus.Host.Serialization.SystemTextJson package
Expand Down
3 changes: 2 additions & 1 deletion build/tasks.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ $projects = @(
"SlimMessageBus.Host.Sql",
"SlimMessageBus.Host.Sql.Common",
"SlimMessageBus.Host.Nats",

"SlimMessageBus.Host.AmazonSQS",

"SlimMessageBus.Host.FluentValidation",

"SlimMessageBus.Host.Outbox",
Expand Down
1 change: 1 addition & 0 deletions docs/NuGet.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SlimMessageBus additionally provides request-response implementation over messag

Transports:

- Amazon SQS/SNS
- Apache Kafka
- Azure Event Hub
- Azure Service Bus
Expand Down
9 changes: 5 additions & 4 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

- [Introduction](intro.md)
- Providers
- [Amazon SQS/SNS](provider_amazon_sqs.md)
- [Apache Kafka](provider_kafka.md)
- [Azure Event Hubs](provider_azure_eventhubs.md)
- [Azure Service Bus](provider_azure_servicebus.md)
- [Azure EventHubs](provider_azure_eventhubs.md)
- [Azure ServiceBus](provider_azure_servicebus.md)
- [Hybrid](provider_hybrid.md)
- [MQTT](provider_mqtt.md)
- [Memory](provider_memory.md)
- [RabbitMq](provider_rabbitmq.md)
- [NATS](provider_nats.md)
- [RabbitMQ](provider_rabbitmq.md)
- [Redis](provider_redis.md)
- [SQL](provider_sql.md)
- [NATS](provider_nats.md)
- [Serialization Plugins](serialization.md)
297 changes: 297 additions & 0 deletions docs/provider_amazon_sqs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
# Amazon SQS Provider for SlimMessageBus <!-- omit in toc -->

Before diving into this provider documentation, please make sure to read the [Introduction](intro.md).

### Table of Contents

- [Configuration](#configuration)
- [Amazon SNS](#amazon-sns)
- [Producing Messages](#producing-messages)
- [Consuming Messages](#consuming-messages)
- [Consumer Context](#consumer-context)
- [Transport-Specific Settings](#transport-specific-settings)
- [Headers](#headers)
- [Request-Response Configuration](#request-response-configuration)
- [Producing Request Messages](#producing-request-messages)
- [Handling Request Messages](#handling-request-messages)
- [Topology Provisioning](#topology-provisioning)
- [Triggering Topology Provisioning](#triggering-topology-provisioning)

## Configuration

To configure Amazon SQS as your transport provider, you need to specify the AWS region and choose an authentication method:

- **Static Credentials**: [Learn more](https://docs.aws.amazon.com/sdkref/latest/guide/access-iam-users.html)
- **Temporary Credentials**: [Learn more](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html#RequestWithSTS)

```csharp
using SlimMessageBus.Host.AmazonSQS;
```

```cs
services.AddSlimMessageBus((mbb) =>
{
mbb.WithProviderAmazonSQS(cfg =>
{
cfg.UseRegion(Amazon.RegionEndpoint.EUCentral1);

// Use static credentials: https://docs.aws.amazon.com/sdkref/latest/guide/access-iam-users.html
cfg.UseCredentials(accessKey, secretAccessKey);

// Use temporary credentials: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html#RequestWithSTS
//cfg.UseTemporaryCredentials(roleArn, roleSessionName);
AdditionalSqsSetup(cfg);
});
});
```

For an example configuration, check out this file: [`SqsMessageBusSettings`](../src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs). The settings allow you to customize the SQS client object and control topology provisioning for advanced scenarios.

## Amazon SNS

Support for Amazon SNS (Simple Notification Service) will be added soon to this transport plugin.

## Producing Messages

To produce a `TMessage` to an Amazon SQS queue or an SNS topic:

```csharp
// Send TMessage to Amazon SQS queue
mbb.Produce<TMessage>(x => x.UseQueue());

// OR
// Send TMessage to Amazon SNS topic
mbb.Produce<TMessage>(x => x.UseTopic());
```

This configuration sends `TMessage` to the specified Amazon SQS queue or SNS topic. You can then produce messages like this:

```csharp
TMessage msg;

// Send msg to "some-queue"
await bus.Publish(msg, "some-queue");

// OR
// Send msg to "some-topic"
await bus.Publish(msg, "some-topic");
```

The second parameter (`path`) specifies the name of the queue or topic to use.

If you have a default queue or topic configured for a message type:

```csharp
mbb.Produce<TMessage>(x => x.DefaultQueue("some-queue"));

// OR
mbb.Produce<TMessage>(x => x.DefaultTopic("some-topic"));
```

You can simply call `bus.Publish(msg)` without providing the second parameter:

```csharp
// Send msg to the default queue "some-queue" (or "some-topic")
bus.Publish(msg);
```

Note that if no explicit configuration is provided, the system assumes the message will be sent to a topic (equivalent to using `UseTopic()`).

## Consuming Messages

To consume messages of type `TMessage` by `TConsumer` from an Amazon SNS topic named `some-topic`:

```csharp
mbb.Consume<TMessage>(x => x
.Queue("some-topic")
//.WithConsumer<TConsumer>());
```

To consume messages from an Amazon SQS queue named `some-queue`:

```csharp
mbb.Consume<TMessage>(x => x
.Queue("some-queue")
//.WithConsumer<TConsumer>());
```

### Consumer Context

The consumer can implement the `IConsumerWithContext` interface to access native Amazon SQS messages:

```csharp
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
{
public IConsumerContext Context { get; set; }

public Task OnHandle(PingMessage message, CancellationToken cancellationToken)
{
// Access the native Amazon SQS message:
var transportMessage = Context.GetTransportMessage(); // Amazon.SQS.Model.Message type
}
}
```

This can be helpful to extract properties like `MessageId` or `Attributes` from the native SQS message.

## Transport-Specific Settings

Producer and consumer configurations have additional settings like:

- **EnableFifo**
- **MaxMessageCount**

For a producer:

```csharp
mbb.Produce<TMessage>(x => x
.EnableFifo(f => f
.DeduplicationId((m, h) => (m.Counter + 1000).ToString())
.GroupId((m, h) => m.Counter % 2 == 0 ? "even" : "odd")
)
);
```

For a consumer:

```csharp
mbb.Consume<TMessage>(x => x
.WithConsumer<TConsumer>()
.Queue("some-queue")
.MaxMessageCount(10)
.Instances(1));
```

These default values can also be set at the message bus level using `SqsMessageBusSettings`:

```csharp
mbb.WithProviderAmazonSQS(cfg =>
{
cfg.MaxMessageCount = 10;
});
```

Settings at the consumer level take priority over the global defaults.

## Headers

Amazon SQS has specific requirements for message headers, detailed in [this guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes).
Headers in SlimMessageBus (SMB) can be any type (`object`). To convert them, SMB uses a header serializer, like the default implementation: [DefaultSqsHeaderSerializer](../src/SlimMessageBus.Host.AmazonSQS/Headers/DefaultSqsHeaderSerializer.cs).

By default, string values are attempted to be converted to `Guid`, `bool`, and `DateTime`.

You can override the serializer through `SqsMessageBusSettings`.

## Request-Response Configuration

### Producing Request Messages

When sending a request, you need to specify whether it should be delivered to a topic or queue (see [Producing Messages](#producing-messages)).

If you want to receive responses on a Service Bus topic:

```csharp
mbb.ExpectRequestResponses(x =>
{
x.ReplyToTopic("test-echo-resp");
x.DefaultTimeout(TimeSpan.FromSeconds(60));
});
```

Or to receive responses on a queue:

```csharp
mbb.ExpectRequestResponses(x =>
{
x.ReplyToQueue("test-echo-queue-resp");
x.DefaultTimeout(TimeSpan.FromSeconds(60));
});
```

Each service instance should have a dedicated queue or topic to ensure the response arrives back at the correct instance. This ensures the internal task `Task<TResponse>` of `bus.Send(TRequest)` resumes correctly.

### Handling Request Messages

For services handling requests, you must configure the request consumption settings for a specific queue or topic:

```csharp
mbb.Handle<EchoRequest, EchoResponse>(x => x
.Topic(topic)
.SubscriptionName("handler")
.WithHandler<EchoRequestHandler>()
.Instances(2));
```

Or for a queue:

```csharp
mbb.Handle<EchoRequest, EchoResponse>(x => x
.Queue(queue)
.WithHandler<EchoRequestHandler>()
.Instances(2));
```

Ensure that if a request is sent to a queue, the consumer is also listening on that queue (and similarly for topics). Mixing queues and topics for requests and responses is not supported.

## Topology Provisioning

Amazon SQS can automatically create any queues declared in your SMB configuration when needed. This process occurs when the SMB instance starts, and only for queues that do not yet exist. If a queue already exists, it will not be modified.

> **Note**: Automatic topology creation is enabled by default.

To disable automatic topology provisioning:

```csharp
mbb.WithProviderAmazonSQS(cfg =>
{
cfg.TopologyProvisioning.Enabled = false;
});
```

You can also customize how queues are created by modifying the `CreateQueueOptions`:

```csharp
mbb.WithProviderAmazonSQS(cfg =>
{
cfg.TopologyProvisioning = new()
{
CreateQueueOptions = (options) =>
{
// Customize queue options here
}
};
});
```

You can control which services create queues:

```csharp
mbb.WithProviderAmazonSQS(cfg =>
{
cfg.TopologyProvisioning = new()
{
Enabled = true,
CanProducerCreateQueue = true, // Only producers can create queues
CanConsumerCreateQueue = false, // Consumers cannot create queues
};
});
```

By default, both flags are enabled (`true`). This flexibility allows you to define ownership of queues/topics clearlye.g., producers handle queue creation while consumers manage subscriptions.

## Triggering Topology Provisioning

Typically, topology provisioning occurs when the bus is first created (e.g., at application startup). If the underlying topology changes (e.g., queues or topics are manually deleted), you may need to re-trigger provisioning:

```csharp
ITopologyControl ctrl = // injected
await ctrl.ProvisionTopology();
```

This allows you to recreate missing elements in the infrastructure without restarting the whole application.
Loading

0 comments on commit 6479974

Please sign in to comment.