Skip to content

Commit

Permalink
Amazon.SQS transport
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Nov 24, 2024
1 parent a8b4b34 commit c3c500b
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 214 deletions.
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

- [Introduction](intro.md)
- Providers
- [Amazon SQS/SNS](provider_amazon_sqs_.md)
- [Amazon SQS/SNS](provider_amazon_sqs.md)
- [Apache Kafka](provider_kafka.md)
- [Azure EventHubs](provider_azure_eventhubs.md)
- [Azure ServiceBus](provider_azure_servicebus.md)
Expand Down
146 changes: 41 additions & 105 deletions docs/provider_amazon_sqs.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ Please read the [Introduction](intro.md) before reading this provider documentat

- [Configuration](#configuration)
- [Producing Messages](#producing-messages)
- [Headers](#headers)
- [Consuming Messages](#consuming-messages)
- [Consumer context](#consumer-context)
- [Exception Handling for Consumers](#exception-handling-for-consumers)
- [Transport Specific Settings](#transport-specific-settings)
- [Transport Specific Settings](#transport-specific-settings)
- [Headers](#headers)
- [Request-Response Configuration](#request-response-configuration)
- [Produce Request Messages](#produce-request-messages)
- [Handle Request Messages](#handle-request-messages)
- [Topology Provisioning](#topology-provisioning)
- [Validation of Topology](#validation-of-topology)
- [Trigger Topology Provisioning](#trigger-topology-provisioning)

## Configuration
Expand Down Expand Up @@ -98,17 +97,6 @@ bus.Publish(msg);

Setting the default queue name `DefaultQueue()` for a message type will implicitly configure `UseQueue()` for that message type. By default, if configuration is not provided then runtime will assume a message needs to be sent on a topic (and works as if `UseTopic()` was configured).

## Headers

Headers is Amazon SQS have a specific requirements as explained in [this guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes).

The message header values in SMB could be any type (`object`) and the conversation to the possible header value is achieved through the [ISqsHeaderSerializer](../src/SlimMessageBus.Host.AmazonSQS/Headers/ISqsHeaderSerializer.cs).
By default the implementation [DefaultSqsHeaderSerializer](../src/SlimMessageBus.Host.AmazonSQS/Headers/DefaultSqsHeaderSerializer.cs).

> In the default header serializer implementation the string value is attempted to be converted into Guid, bool, and DateTime.
The transport provider settings [SqsMessageBusSettings](../src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs) have a property that allows to override and supply a custom implementation.

## Consuming Messages

To consume `TMessage` by `TConsumer` from `some-topic` Azure Service Bus topic use:
Expand Down Expand Up @@ -147,38 +135,54 @@ If you need to send only selected messages to DLQ, wrap the body of your consume

SMB will also set a user property `SMB.Exception` on the message with the exception details (just the message, no stack trace). This should be helpful when reviewing messages on the DLQ.

### Transport Specific Settings
## Transport Specific Settings

The consumer expose additional settings from the underlying ASB client:
The consumer and producers expose additional settings:

- [PrefetchCount](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusprocessoroptions.prefetchcount)
- [MaxAutoLockRenewalDuration](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusprocessoroptions.maxautolockrenewalduration)
- [SubQueue](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusprocessoroptions.subqueue)
- [EnableFifo](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusprocessoroptions.prefetchcount)
- [MaxMessageCount](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusprocessoroptions.maxautolockrenewalduration)
```cs
mbb.Produce<TMessage>(x => x
// enables FIFO and will use the specific deduplication id and group id
.EnableFifo(f => f
.DeduplicationId((m, h) => (m.Counter + 1000).ToString())
.GroupId((m, h) => m.Counter % 2 == 0 ? "even" : "odd")
)
);
```

```cs
mbb.Consume<TMessage>(x => x
.WithConsumer<TConsumer>()
.Queue("some-queue")
.MaxAutoLockRenewalDuration(TimeSpan.FromMinutes(7))
.SubQueue(SubQueue.DeadLetter)
.PrefetchCount(10)
.SubscriptionSqlFilter("1=1") // ASB subscription SQL filters can also be created - see topology creation section
.MaxMessageCount(10)
.Instances(1));
```

Where applicable, selected settings can have the default values applied using `ServiceBusMessageBusSettings`:
Where applicable, selected settings can have the default values applied using [`SqsMessageBusSettings`](../src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs):

```cs
mbb.WithProviderServiceBus(cfg =>
mbb.WithProviderAmazonSQS(cfg =>
{
// ...
cfg.MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(7);
cfg.PrefetchCount = 10;
cfg.MaxMessageCount = 10;
})
```

However, the specific settings applied at the consumer level take priority.

## Headers

Headers is Amazon SQS have a specific requirements as explained in [this guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes).
The message header values in SMB could be any type (`object`) and the conversation to the possible header value is achieved through the [ISqsHeaderSerializer](../src/SlimMessageBus.Host.AmazonSQS/Headers/ISqsHeaderSerializer.cs).
By default the implementation [DefaultSqsHeaderSerializer](../src/SlimMessageBus.Host.AmazonSQS/Headers/DefaultSqsHeaderSerializer.cs).

> In the default header serializer implementation the string value is attempted to be converted into Guid, bool, and DateTime.

The transport provider settings [SqsMessageBusSettings](../src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs) have a property that allows to override and supply a custom implementation.

## Request-Response Configuration

### Produce Request Messages
Expand All @@ -191,7 +195,6 @@ The SMB Azure Service Bus provider needs to know if you want to receive response
mbb.ExpectRequestResponses(x =>
{
x.ReplyToTopic("test-echo-resp");
x.SubscriptionName("response-consumer");
x.DefaultTimeout(TimeSpan.FromSeconds(60));
});
```
Expand All @@ -208,11 +211,11 @@ mbb.ExpectRequestResponses(x =>

In either case, each of your micro-service instances must have its own dedicated queue (or topic). At the end, when your n-th service instance sends a request, we want the response to arrive back to that n-th service instance. Specifically, this is required so that the internal `Task<TResponse>` of `bus.Send(TRequest)` is resumed and the parent task can continue.

It is preferred to receive responses on queues rather than topics.
It is recommended to receive responses on queues rather than topics.

### Handle Request Messages

The request processing service (the responding service) in the case of SMB Azure Service Bus providers needs to configure the queue (or topic) that the request messages will be consumed from:
The request processing service (the responding service) in the case of SMB Amazon SQS Bus providers needs to configure the queue (or topic) that the request messages will be consumed from:

```cs
mbb.Handle<EchoRequest, EchoResponse>(x => x
Expand All @@ -234,8 +237,8 @@ that topic. You cannot mix sending to a topic and consuming from a queue (or vic

## Topology Provisioning

Amazon SQS transport provider can automatically create the required queue/topic/subscription that have been declared as part of the SMB configuration.
The provisioning happens as soon as the SMB instance is created and prior any consumers start processing messages. The creation happens only when a particular topic/queue/subscription does not exist. If it exist the SMB will not alter it.
Amazon SQS transport provider can automatically create the required queue that have been declared as part of the SMB configuration.
The provisioning happens as soon as the SMB instance is created and prior any consumers start processing messages. The creation happens only when a particular queue does not exist. If it exist the SMB will not alter it.

> The topology creation is turned on by default.

Expand All @@ -248,80 +251,33 @@ mbb.WithProviderAmazonSQS(cfg =>
});
```

When a queue/topic/subscription needs to be created, SMB will create the underlying SQS client options object, then will use the provided delegate to populate the settings ([CreateQueueOptions](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.administration.createqueueoptions?view=azure-dotnet), [CreateTopicOptions](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.administration.createtopicoptions?view=azure-dotnet), [CreateSubscriptionOptions](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.administration.createsubscriptionoptions?view=azure-dotnet)).
When a queue needs to be created, SMB will create the underlying SQS client options object, then will use the provided delegate to populate the settings ([CreateQueueOptions](../src/SlimMessageBus.Host.AmazonSQS/SqsTopologySettings.cs)).

The bus wide default creation options can be set in this way:

```cs
mbb.WithProviderServiceBus(cfg =>
mbb.WithProviderAmazonSQS(cfg =>
{
//cfg.ConnectionString = serviceBusConnectionString;
cfg.TopologyProvisioning = new()
{
CreateQueueOptions = (options) =>
{
options.EnablePartitioning = true;
options.RequiresDuplicateDetection = true;
options.DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(5);
},
CreateTopicOptions = (options) =>
{
options.EnablePartitioning = true;
options.RequiresDuplicateDetection = true;
options.DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(5);
},
CreateSubscriptionOptions = (options) =>
{
options.LockDuration = TimeSpan.FromMinutes(5);
// See
}
};
});
```

The particular producer or consumer can introduce more specific settings:

```cs
mbb.Produce<SomeMessage>(x => x
.DefaultTopic("some-topic")
.CreateTopicOptions((options) =>
{
options.RequiresDuplicateDetection = false;
})
);

mbb.Consume<SomeMessage>(x => x
.Topic("some-topic")
.WithConsumer<SomeMessageConsumer>()
.SubscriptionName("some-service")
.SubscriptionSqlFilter("1=1") // this will create a rule with SQL filter
.CreateTopicOptions((options) =>
{
options.RequiresDuplicateDetection = false;
})
);
```

For any queue/topic/subscription/rule that needs to be created, the relevant options object is created, bus wide options are applied, consumer/producer specific settings are applied and lastly the resource is created in ASB. That allows to combine settings and ensure more specific values can be applied.

> The setting `RequiresSession` on the `CreateQueueOptions` and `CreateSubscriptionOptions` is automatically populated by SMB depending if [sessions have been enabled](#asb-sessions) for the queue/subscription consumer.

Also, it might be desired that only producers or consumers can create the respective queue/topic/subscription. This can be specified:
Also, it might be desired that only producers or consumers can create the respective queue. This can be specified:

```cs
mbb.WithProviderServiceBus(cfg =>
mbb.WithProviderAmazonSQS(cfg =>
{
//cfg.ConnectionString = serviceBusConnectionString;
cfg.TopologyProvisioning = new ServiceBusTopologySettings
cfg.TopologyProvisioning = new()
{
Enabled = true,
CanProducerCreateQueue = true, // only declared producers will be used to provision queues
CanProducerCreateTopic = true, // only declared producers will be used to provision topics
CanConsumerCreateQueue = false, // the consumers will not be able to provision a missing queue
CanConsumerCreateTopic = false, // the consumers will not be able to provision a missing topic
CanConsumerCreateSubscription = true, // but the consumers will add the missing subscription if needed
CanConsumerCreateSubscriptionFilter = true, // but the consumers will add the missing filter on subscription if needed
};
});
```
Expand All @@ -332,26 +288,6 @@ By default, all the flags are enabled (set to `true`). This is for convenience.

In the case where multiple consumers share the same subscription name and topology provisioning is required, all `CreateConsumerOptions` must contain the same values for the same subscription. Filters are merged but must be equal if they use the same name.

### Validation of Topology

Where it is preferred to log any deviations from the expected topology without making any changes, the setting 'CanConsumerValidateSubscriptionFilters` can be applied.

```cs
mbb.WithProviderServiceBus(cfg =>
{
cfg.TopologyProvisioning = new ServiceBusTopologySettings
{
Enabled = true,
CanConsumerCreateTopic = false, // the consumers will not be able to provision a missing topic
CanConsumerCreateSubscription = true, // the consumers will not be able to add a missing subscription if needed
CanConsumerCreateSubscriptionFilter = true, // the consumers will not be able to add a missing filter on subscription
CanConsumerValidateSubscriptionFilters = true, // any deviations from the expected will be logged
};

...
});
```

### Trigger Topology Provisioning

Typically when the bus is created (on application process start) the topology provisioning happens (when enabled).
Expand Down
Loading

0 comments on commit c3c500b

Please sign in to comment.