diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 424ad509..a4502db1 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -102,7 +102,7 @@ jobs: --verbosity normal \ --logger "trx;LogFilePrefix=Integration" \ --collect:"XPlat Code Coverage;Format=opencover" \ - --filter "Category=Integration&Transport=Outbox" + --filter "Category=Integration" working-directory: ./src env: # Connects to the Azure cloud @@ -111,6 +111,10 @@ jobs: azure_eventhub_connectionstring: ${{ secrets.azure_eventhub_connectionstring }} azure_storagecontainer_connectionstring: ${{ secrets.azure_storagecontainer_connectionstring }} + # Connects to AWS cloud + amazon_access_key: ${{ secrets.amazon_access_key }} + amazon_secret_access_key: ${{ secrets.amazon_secret_access_key }} + _kafka_brokers: ${{ secrets.kafka_brokers }} _kafka_username: ${{ secrets.kafka_username }} _kafka_password: ${{ secrets.kafka_password }} diff --git a/README.md b/README.md index 0afa8dac..a5d018f5 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,8 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i [![Vulnerabilities](https://sonarcloud.io/api/project_badges/measure?project=zarusz_SlimMessageBus&metric=vulnerabilities)](https://sonarcloud.io/summary/overall?id=zarusz_SlimMessageBus) [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=zarusz_SlimMessageBus&metric=alert_status)](https://sonarcloud.io/summary/overall?id=zarusz_SlimMessageBus) -> The v2 release is available (see [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0)). > The v3 release is [under construction](https://github.com/zarusz/SlimMessageBus/tree/release/v3). +> The v2 release is available (see [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0)). - [Key elements of SlimMessageBus](#key-elements-of-slimmessagebus) - [Docs](#docs) @@ -47,6 +47,7 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i - [Introduction](docs/intro.md) - Providers: + - [Amazon SQS/SNS](docs/provider_amazon_sqs.md) - [Apache Kafka](docs/provider_kafka.md) - [Azure EventHubs](docs/provider_azure_eventhubs.md) - [Azure ServiceBus](docs/provider_azure_servicebus.md) @@ -69,11 +70,12 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i | ------------------------------------ | ------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `SlimMessageBus` | The core API for SlimMessageBus | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.svg)](https://www.nuget.org/packages/SlimMessageBus) | | **Transport providers** | | | +| `.Host.AmazonSQS` | Transport provider for Amazon SQS / SNS | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AmazonSQS.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AmazonSQS) | | `.Host.AzureEventHub` | Transport provider for Azure Event Hubs | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AzureEventHub.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AzureEventHub) | | `.Host.AzureServiceBus` | Transport provider for Azure Service Bus | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AzureServiceBus.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AzureServiceBus) | | `.Host.Kafka` | Transport provider for Apache Kafka | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Kafka.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Kafka) | -| `.Host.Memory` | Transport provider implementation for in-process (in memory) message passing (no messaging infrastructure required) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Memory.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Memory) | | `.Host.MQTT` | Transport provider for MQTT | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.MQTT.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.MQTT) | +| `.Host.Memory` | Transport provider implementation for in-process (in memory) message passing (no messaging infrastructure required) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Memory.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Memory) | | `.Host.NATS` | Transport provider for [NATS](https://nats.io/) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.NATS.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.NATS) | | `.Host.RabbitMQ` | Transport provider for RabbitMQ | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.RabbitMQ.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.RabbitMQ) | | `.Host.Redis` | Transport provider for Redis | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Redis.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Redis) | @@ -186,7 +188,7 @@ services.AddSlimMessageBus(mbb => // Scan assembly for consumers, handlers, interceptors, and register into MSDI .AddServicesFromAssemblyContaining() - //.AddServicesFromAssembly(Assembly.GetExecutingAssembly()); + //.AddServicesFromAssembly(Assembly.GetExecutingAssembly()) // Add JSON serializer .AddJsonSerializer(); // requires SlimMessageBus.Host.Serialization.Json or SlimMessageBus.Host.Serialization.SystemTextJson package diff --git a/build/tasks.ps1 b/build/tasks.ps1 index 8eb62a4c..09afe10c 100644 --- a/build/tasks.ps1 +++ b/build/tasks.ps1 @@ -35,7 +35,8 @@ $projects = @( "SlimMessageBus.Host.Sql", "SlimMessageBus.Host.Sql.Common", "SlimMessageBus.Host.Nats", - + "SlimMessageBus.Host.AmazonSQS", + "SlimMessageBus.Host.FluentValidation", "SlimMessageBus.Host.Outbox", diff --git a/docs/NuGet.md b/docs/NuGet.md index df5f1c9f..23b474ef 100644 --- a/docs/NuGet.md +++ b/docs/NuGet.md @@ -6,6 +6,7 @@ SlimMessageBus additionally provides request-response implementation over messag Transports: +- Amazon SQS/SNS - Apache Kafka - Azure Event Hub - Azure Service Bus diff --git a/docs/README.md b/docs/README.md index dbbeca35..088bf118 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,14 +2,15 @@ - [Introduction](intro.md) - Providers + - [Amazon SQS/SNS](provider_amazon_sqs.md) - [Apache Kafka](provider_kafka.md) - - [Azure Event Hubs](provider_azure_eventhubs.md) - - [Azure Service Bus](provider_azure_servicebus.md) + - [Azure EventHubs](provider_azure_eventhubs.md) + - [Azure ServiceBus](provider_azure_servicebus.md) - [Hybrid](provider_hybrid.md) - [MQTT](provider_mqtt.md) - [Memory](provider_memory.md) - - [RabbitMq](provider_rabbitmq.md) + - [NATS](provider_nats.md) + - [RabbitMQ](provider_rabbitmq.md) - [Redis](provider_redis.md) - [SQL](provider_sql.md) - - [NATS](provider_nats.md) - [Serialization Plugins](serialization.md) diff --git a/docs/intro.md b/docs/intro.md index e415e8e6..909c2f5a 100644 --- a/docs/intro.md +++ b/docs/intro.md @@ -41,6 +41,8 @@ - [Logging](#logging) - [Debugging](#debugging) - [Provider specific functionality](#provider-specific-functionality) +- [Topology Provisioning](#topology-provisioning) + - [Triggering Topology Provisioning](#triggering-topology-provisioning) ## Configuration @@ -1142,4 +1144,20 @@ Providers: - [Memory](provider_memory.md) - [RabbitMQ](provider_rabbitmq.md) - [Redis](provider_redis.md) -- [SQL](provider_sql.md) \ No newline at end of file +- [SQL](provider_sql.md) + +## Topology Provisioning + +Most of the transport providers support the automatic creation of the needed messaging topology (queues, topics, subscriptions, etc). + +### Triggering Topology Provisioning + +Topology provisioning occurs when the bus is first created (e.g., at application startup). If the underlying topology changes (e.g., queues or topics are manually deleted), you may need to re-trigger provisioning programmatically: + +```csharp +ITopologyControl ctrl = // injected + +await ctrl.ProvisionTopology(); +``` + +This allows to recreate missing elements in the infrastructure without restarting the whole application. \ No newline at end of file diff --git a/docs/intro.t.md b/docs/intro.t.md index 31a7bf0c..f5512cba 100644 --- a/docs/intro.t.md +++ b/docs/intro.t.md @@ -41,6 +41,8 @@ - [Logging](#logging) - [Debugging](#debugging) - [Provider specific functionality](#provider-specific-functionality) +- [Topology Provisioning](#topology-provisioning) + - [Triggering Topology Provisioning](#triggering-topology-provisioning) ## Configuration @@ -1129,3 +1131,19 @@ Providers: - [RabbitMQ](provider_rabbitmq.md) - [Redis](provider_redis.md) - [SQL](provider_sql.md) + +## Topology Provisioning + +Most of the transport providers support the automatic creation of the needed messaging topology (queues, topics, subscriptions, etc). + +### Triggering Topology Provisioning + +Topology provisioning occurs when the bus is first created (e.g., at application startup). If the underlying topology changes (e.g., queues or topics are manually deleted), you may need to re-trigger provisioning programmatically: + +```csharp +ITopologyControl ctrl = // injected + +await ctrl.ProvisionTopology(); +``` + +This allows to recreate missing elements in the infrastructure without restarting the whole application. diff --git a/docs/provider_amazon_sqs.md b/docs/provider_amazon_sqs.md new file mode 100644 index 00000000..205f1a6a --- /dev/null +++ b/docs/provider_amazon_sqs.md @@ -0,0 +1,286 @@ +# Amazon SQS Provider for SlimMessageBus + +Before diving into this provider documentation, please make sure to read the [Introduction](intro.md). + +### Table of Contents + +- [Configuration](#configuration) +- [Amazon SNS](#amazon-sns) +- [Producing Messages](#producing-messages) +- [Consuming Messages](#consuming-messages) + - [Consumer Context](#consumer-context) +- [Transport-Specific Settings](#transport-specific-settings) +- [Headers](#headers) +- [Request-Response Configuration](#request-response-configuration) + - [Producing Request Messages](#producing-request-messages) + - [Handling Request Messages](#handling-request-messages) +- [Topology Provisioning](#topology-provisioning) + +## Configuration + +To configure Amazon SQS as your transport provider, you need to specify the AWS region and choose an authentication method: + +- **Static Credentials**: [Learn more](https://docs.aws.amazon.com/sdkref/latest/guide/access-iam-users.html) +- **Temporary Credentials**: [Learn more](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html#RequestWithSTS) + +```csharp +using SlimMessageBus.Host.AmazonSQS; +``` + +```cs +services.AddSlimMessageBus((mbb) => +{ + mbb.WithProviderAmazonSQS(cfg => + { + cfg.UseRegion(Amazon.RegionEndpoint.EUCentral1); + + // Use static credentials: https://docs.aws.amazon.com/sdkref/latest/guide/access-iam-users.html + cfg.UseCredentials(accessKey, secretAccessKey); + + // Use temporary credentials: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html#RequestWithSTS + //cfg.UseTemporaryCredentials(roleArn, roleSessionName); + + AdditionalSqsSetup(cfg); + }); +}); +``` + +For an example configuration, check out this file: [`SqsMessageBusSettings`](../src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs). The settings allow you to customize the SQS client object and control topology provisioning for advanced scenarios. + +## Amazon SNS + +Support for Amazon SNS (Simple Notification Service) will be added soon to this transport plugin. + +## Producing Messages + +To produce a `TMessage` to an Amazon SQS queue or an SNS topic: + +```csharp +// Send TMessage to Amazon SQS queue +mbb.Produce(x => x.UseQueue()); + +// OR + +// Send TMessage to Amazon SNS topic +mbb.Produce(x => x.UseTopic()); +``` + +This configuration sends `TMessage` to the specified Amazon SQS queue or SNS topic. You can then produce messages like this: + +```csharp +TMessage msg; + +// Send msg to "some-queue" +await bus.Publish(msg, "some-queue"); + +// OR + +// Send msg to "some-topic" +await bus.Publish(msg, "some-topic"); +``` + +The second parameter (`path`) specifies the name of the queue or topic to use. + +If you have a default queue or topic configured for a message type: + +```csharp +mbb.Produce(x => x.DefaultQueue("some-queue")); + +// OR + +mbb.Produce(x => x.DefaultTopic("some-topic")); +``` + +You can simply call `bus.Publish(msg)` without providing the second parameter: + +```csharp +// Send msg to the default queue "some-queue" (or "some-topic") +bus.Publish(msg); +``` + +Note that if no explicit configuration is provided, the system assumes the message will be sent to a topic (equivalent to using `UseTopic()`). + +## Consuming Messages + +To consume messages of type `TMessage` by `TConsumer` from an Amazon SNS topic named `some-topic`: + +```csharp +mbb.Consume(x => x + .Queue("some-topic") + //.WithConsumer()); +``` + +To consume messages from an Amazon SQS queue named `some-queue`: + +```csharp +mbb.Consume(x => x + .Queue("some-queue") + //.WithConsumer()); +``` + +### Consumer Context + +The consumer can implement the `IConsumerWithContext` interface to access native Amazon SQS messages: + +```csharp +public class PingConsumer : IConsumer, IConsumerWithContext +{ + public IConsumerContext Context { get; set; } + + public Task OnHandle(PingMessage message, CancellationToken cancellationToken) + { + // Access the native Amazon SQS message: + var transportMessage = Context.GetTransportMessage(); // Amazon.SQS.Model.Message type + } +} +``` + +This can be helpful to extract properties like `MessageId` or `Attributes` from the native SQS message. + +## Transport-Specific Settings + +Producer and consumer configurations have additional settings like: + +- **EnableFifo** +- **MaxMessageCount** + +For a producer: + +```csharp +mbb.Produce(x => x + .EnableFifo(f => f + .DeduplicationId((m, h) => (m.Counter + 1000).ToString()) + .GroupId((m, h) => m.Counter % 2 == 0 ? "even" : "odd") + ) +); +``` + +For a consumer: + +```csharp +mbb.Consume(x => x + .WithConsumer() + .Queue("some-queue") + .MaxMessageCount(10) + .Instances(1)); +``` + +These default values can also be set at the message bus level using `SqsMessageBusSettings`: + +```csharp +mbb.WithProviderAmazonSQS(cfg => +{ + cfg.MaxMessageCount = 10; +}); +``` + +Settings at the consumer level take priority over the global defaults. + +## Headers + +Amazon SQS has specific requirements for message headers, detailed in [this guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes). + +Headers in SlimMessageBus (SMB) can be any type (`object`). To convert them, SMB uses a header serializer, like the default implementation: [DefaultSqsHeaderSerializer](../src/SlimMessageBus.Host.AmazonSQS/Headers/DefaultSqsHeaderSerializer.cs). + +By default, string values are attempted to be converted to `Guid`, `bool`, and `DateTime`. + +You can override the serializer through `SqsMessageBusSettings`. + +## Request-Response Configuration + +### Producing Request Messages + +When sending a request, you need to specify whether it should be delivered to a topic or queue (see [Producing Messages](#producing-messages)). + +To receive responses on a queue: + +```csharp +mbb.ExpectRequestResponses(x => +{ + x.ReplyToQueue("test-echo-queue-resp"); + x.DefaultTimeout(TimeSpan.FromSeconds(60)); +}); +``` + +Or to receive responses on a topic: + +```csharp +mbb.ExpectRequestResponses(x => +{ + x.ReplyToTopic("test-echo-resp"); + x.DefaultTimeout(TimeSpan.FromSeconds(60)); +}); +``` + +Each service instance should have a dedicated queue or topic to ensure the response arrives back at the correct instance. This ensures the internal task `Task` of `bus.Send(TRequest)` resumes correctly. + +### Handling Request Messages + +For services handling requests, you must configure the request consumption settings for a specific queue (or topic): + +Example for queue: + +```csharp +mbb.Handle(x => x + .Queue(queue) + .WithHandler()); +``` + +Example for topic: + +```csharp +mbb.Handle(x => x + .Topic(topic) + .SubscriptionName("handler") + .WithHandler()); +``` + +Ensure that if a request is sent to a queue, the consumer is also listening on that queue (and similarly for topics). Mixing queues and topics for requests and responses is not supported. + +## Topology Provisioning + +Amazon SQS can automatically create any queues declared in your SMB configuration when needed. This process occurs when the SMB instance starts, and only for queues that do not yet exist. If a queue already exists, it will not be modified. + +> **Note**: Automatic topology creation is enabled by default. + +To disable automatic topology provisioning: + +```csharp +mbb.WithProviderAmazonSQS(cfg => +{ + cfg.TopologyProvisioning.Enabled = false; +}); +``` + +You can also customize how queues are created by modifying the `CreateQueueOptions`: + +```csharp +mbb.WithProviderAmazonSQS(cfg => +{ + cfg.TopologyProvisioning = new() + { + CreateQueueOptions = (options) => + { + // Customize queue options here + } + }; +}); +``` + +You can control which services create queues: + +```csharp +mbb.WithProviderAmazonSQS(cfg => +{ + cfg.TopologyProvisioning = new() + { + Enabled = true, + CanProducerCreateQueue = true, // Only producers can create queues + CanConsumerCreateQueue = false, // Consumers cannot create queues + }; +}); +``` + +> By default, both flags are enabled (`true`). + +This flexibility allows you to define ownership of queues/topics clearly—e.g., producers handle queue creation while consumers manage subscriptions. \ No newline at end of file diff --git a/docs/provider_amazon_sqs.t.md b/docs/provider_amazon_sqs.t.md new file mode 100644 index 00000000..d440caa5 --- /dev/null +++ b/docs/provider_amazon_sqs.t.md @@ -0,0 +1,270 @@ +# Amazon SQS Provider for SlimMessageBus + +Before diving into this provider documentation, please make sure to read the [Introduction](intro.md). + +### Table of Contents + +- [Configuration](#configuration) +- [Amazon SNS](#amazon-sns) +- [Producing Messages](#producing-messages) +- [Consuming Messages](#consuming-messages) + - [Consumer Context](#consumer-context) +- [Transport-Specific Settings](#transport-specific-settings) +- [Headers](#headers) +- [Request-Response Configuration](#request-response-configuration) + - [Producing Request Messages](#producing-request-messages) + - [Handling Request Messages](#handling-request-messages) +- [Topology Provisioning](#topology-provisioning) + +## Configuration + +To configure Amazon SQS as your transport provider, you need to specify the AWS region and choose an authentication method: + +- **Static Credentials**: [Learn more](https://docs.aws.amazon.com/sdkref/latest/guide/access-iam-users.html) +- **Temporary Credentials**: [Learn more](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html#RequestWithSTS) + +```csharp +using SlimMessageBus.Host.AmazonSQS; +``` + +@[:cs](../src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsMessageBusIt.cs,ExampleSetup) + +For an example configuration, check out this file: [`SqsMessageBusSettings`](../src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs). The settings allow you to customize the SQS client object and control topology provisioning for advanced scenarios. + +## Amazon SNS + +Support for Amazon SNS (Simple Notification Service) will be added soon to this transport plugin. + +## Producing Messages + +To produce a `TMessage` to an Amazon SQS queue or an SNS topic: + +```csharp +// Send TMessage to Amazon SQS queue +mbb.Produce(x => x.UseQueue()); + +// OR + +// Send TMessage to Amazon SNS topic +mbb.Produce(x => x.UseTopic()); +``` + +This configuration sends `TMessage` to the specified Amazon SQS queue or SNS topic. You can then produce messages like this: + +```csharp +TMessage msg; + +// Send msg to "some-queue" +await bus.Publish(msg, "some-queue"); + +// OR + +// Send msg to "some-topic" +await bus.Publish(msg, "some-topic"); +``` + +The second parameter (`path`) specifies the name of the queue or topic to use. + +If you have a default queue or topic configured for a message type: + +```csharp +mbb.Produce(x => x.DefaultQueue("some-queue")); + +// OR + +mbb.Produce(x => x.DefaultTopic("some-topic")); +``` + +You can simply call `bus.Publish(msg)` without providing the second parameter: + +```csharp +// Send msg to the default queue "some-queue" (or "some-topic") +bus.Publish(msg); +``` + +Note that if no explicit configuration is provided, the system assumes the message will be sent to a topic (equivalent to using `UseTopic()`). + +## Consuming Messages + +To consume messages of type `TMessage` by `TConsumer` from an Amazon SNS topic named `some-topic`: + +```csharp +mbb.Consume(x => x + .Queue("some-topic") + //.WithConsumer()); +``` + +To consume messages from an Amazon SQS queue named `some-queue`: + +```csharp +mbb.Consume(x => x + .Queue("some-queue") + //.WithConsumer()); +``` + +### Consumer Context + +The consumer can implement the `IConsumerWithContext` interface to access native Amazon SQS messages: + +```csharp +public class PingConsumer : IConsumer, IConsumerWithContext +{ + public IConsumerContext Context { get; set; } + + public Task OnHandle(PingMessage message, CancellationToken cancellationToken) + { + // Access the native Amazon SQS message: + var transportMessage = Context.GetTransportMessage(); // Amazon.SQS.Model.Message type + } +} +``` + +This can be helpful to extract properties like `MessageId` or `Attributes` from the native SQS message. + +## Transport-Specific Settings + +Producer and consumer configurations have additional settings like: + +- **EnableFifo** +- **MaxMessageCount** + +For a producer: + +```csharp +mbb.Produce(x => x + .EnableFifo(f => f + .DeduplicationId((m, h) => (m.Counter + 1000).ToString()) + .GroupId((m, h) => m.Counter % 2 == 0 ? "even" : "odd") + ) +); +``` + +For a consumer: + +```csharp +mbb.Consume(x => x + .WithConsumer() + .Queue("some-queue") + .MaxMessageCount(10) + .Instances(1)); +``` + +These default values can also be set at the message bus level using `SqsMessageBusSettings`: + +```csharp +mbb.WithProviderAmazonSQS(cfg => +{ + cfg.MaxMessageCount = 10; +}); +``` + +Settings at the consumer level take priority over the global defaults. + +## Headers + +Amazon SQS has specific requirements for message headers, detailed in [this guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes). + +Headers in SlimMessageBus (SMB) can be any type (`object`). To convert them, SMB uses a header serializer, like the default implementation: [DefaultSqsHeaderSerializer](../src/SlimMessageBus.Host.AmazonSQS/Headers/DefaultSqsHeaderSerializer.cs). + +By default, string values are attempted to be converted to `Guid`, `bool`, and `DateTime`. + +You can override the serializer through `SqsMessageBusSettings`. + +## Request-Response Configuration + +### Producing Request Messages + +When sending a request, you need to specify whether it should be delivered to a topic or queue (see [Producing Messages](#producing-messages)). + +To receive responses on a queue: + +```csharp +mbb.ExpectRequestResponses(x => +{ + x.ReplyToQueue("test-echo-queue-resp"); + x.DefaultTimeout(TimeSpan.FromSeconds(60)); +}); +``` + +Or to receive responses on a topic: + +```csharp +mbb.ExpectRequestResponses(x => +{ + x.ReplyToTopic("test-echo-resp"); + x.DefaultTimeout(TimeSpan.FromSeconds(60)); +}); +``` + +Each service instance should have a dedicated queue or topic to ensure the response arrives back at the correct instance. This ensures the internal task `Task` of `bus.Send(TRequest)` resumes correctly. + +### Handling Request Messages + +For services handling requests, you must configure the request consumption settings for a specific queue (or topic): + +Example for queue: + +```csharp +mbb.Handle(x => x + .Queue(queue) + .WithHandler()); +``` + +Example for topic: + +```csharp +mbb.Handle(x => x + .Topic(topic) + .SubscriptionName("handler") + .WithHandler()); +``` + +Ensure that if a request is sent to a queue, the consumer is also listening on that queue (and similarly for topics). Mixing queues and topics for requests and responses is not supported. + +## Topology Provisioning + +Amazon SQS can automatically create any queues declared in your SMB configuration when needed. This process occurs when the SMB instance starts, and only for queues that do not yet exist. If a queue already exists, it will not be modified. + +> **Note**: Automatic topology creation is enabled by default. + +To disable automatic topology provisioning: + +```csharp +mbb.WithProviderAmazonSQS(cfg => +{ + cfg.TopologyProvisioning.Enabled = false; +}); +``` + +You can also customize how queues are created by modifying the `CreateQueueOptions`: + +```csharp +mbb.WithProviderAmazonSQS(cfg => +{ + cfg.TopologyProvisioning = new() + { + CreateQueueOptions = (options) => + { + // Customize queue options here + } + }; +}); +``` + +You can control which services create queues: + +```csharp +mbb.WithProviderAmazonSQS(cfg => +{ + cfg.TopologyProvisioning = new() + { + Enabled = true, + CanProducerCreateQueue = true, // Only producers can create queues + CanConsumerCreateQueue = false, // Consumers cannot create queues + }; +}); +``` + +> By default, both flags are enabled (`true`). + +This flexibility allows you to define ownership of queues/topics clearly—e.g., producers handle queue creation while consumers manage subscriptions. diff --git a/docs/provider_memory.md b/docs/provider_memory.md index ac9777c8..df12b4df 100644 --- a/docs/provider_memory.md +++ b/docs/provider_memory.md @@ -203,7 +203,7 @@ services.AddSlimMessageBus(mbb => When the `.Publish()` is invoked in the non-blocking mode: -- the consumers will be executed in another async task (in the background), +- the consumers will be executed in another async task (in thMemoryMessageBusIte background), - that task cancellation token will be bound to the message bus lifecycle (consumers are stopped, the bus is disposed or application shuts down), - the order of message delivery to consumer will match the order of publish, - however, when number of concurrent consumer instances > 0 (`.Instances(N)`) then up to N messages will be processed concurrently (having impact on ordering) diff --git a/docs/provider_memory.t.md b/docs/provider_memory.t.md index 347d678e..e557b61a 100644 --- a/docs/provider_memory.t.md +++ b/docs/provider_memory.t.md @@ -191,7 +191,7 @@ To use non-blocking publish use the `EnableBlockingPublish` property setting: When the `.Publish()` is invoked in the non-blocking mode: -- the consumers will be executed in another async task (in the background), +- the consumers will be executed in another async task (in thMemoryMessageBusIte background), - that task cancellation token will be bound to the message bus lifecycle (consumers are stopped, the bus is disposed or application shuts down), - the order of message delivery to consumer will match the order of publish, - however, when number of concurrent consumer instances > 0 (`.Instances(N)`) then up to N messages will be processed concurrently (having impact on ordering) diff --git a/src/Samples/Sample.Simple.ConsoleApp/Program.cs b/src/Samples/Sample.Simple.ConsoleApp/Program.cs index 08552f5f..2f571e5f 100644 --- a/src/Samples/Sample.Simple.ConsoleApp/Program.cs +++ b/src/Samples/Sample.Simple.ConsoleApp/Program.cs @@ -14,6 +14,7 @@ using SlimMessageBus; using SlimMessageBus.Host; +using SlimMessageBus.Host.AmazonSQS; using SlimMessageBus.Host.AzureEventHub; using SlimMessageBus.Host.AzureServiceBus; using SlimMessageBus.Host.Kafka; @@ -28,6 +29,7 @@ enum Provider AzureEventHub, Redis, Memory, + AmazonSQS } /// @@ -166,10 +168,9 @@ private static void ConfigureMessageBus(MessageBusBuilder mbb, IConfiguration co //.WithConsumer(nameof(AddCommandConsumer.OnHandle)) //.WithConsumer((consumer, message, name) => consumer.OnHandle(message, name)) .KafkaGroup(consumerGroup) // for Apache Kafka - .EventHubGroup(consumerGroup) // for Azure Event Hub - // for Azure Service Bus - .SubscriptionName(consumerGroup) - .SubscriptionSqlFilter("2=2") + .EventHubGroup(consumerGroup) // for Azure Event Hub + .SubscriptionName(consumerGroup) // for Azure Service Bus + .SubscriptionSqlFilter("2=2") // for Azure Service Bus .CreateTopicOptions((options) => { options.RequiresDuplicateDetection = true; @@ -242,7 +243,7 @@ private static void ConfigureMessageBus(MessageBusBuilder mbb, IConfiguration co builder.WithProviderServiceBus(cfg => { cfg.ConnectionString = serviceBusConnectionString; - cfg.TopologyProvisioning = new ServiceBusTopologySettings() + cfg.TopologyProvisioning = new() { Enabled = true, CreateQueueOptions = (options) => @@ -319,14 +320,25 @@ void AddSsl(ClientConfig c) break; case Provider.Redis: - // Ensure your Kafka broker is running - var redisConnectionString = Secrets.Service.PopulateSecrets(configuration["Redis:ConnectionString"]); - + var redisConnectionString = Secrets.Service.PopulateSecrets(configuration["Redis:ConnectionString"]); + // Or use Redis as provider builder.WithProviderRedis(cfg => { cfg.ConnectionString = redisConnectionString; }); + break; + + case Provider.AmazonSQS: + var accessKey = Secrets.Service.PopulateSecrets(configuration["Amazon:AccessKey"]); + var secretAccess = Secrets.Service.PopulateSecrets(configuration["Amazon:SecretAccess"]); + + // Or use Amazon SQS as provider + builder.WithProviderAmazonSQS(cfg => + { + cfg.UseRegion(Amazon.RegionEndpoint.EUCentral1); + cfg.UseCredentials(accessKey, secretAccess); + }); break; } }); diff --git a/src/Samples/Sample.Simple.ConsoleApp/Sample.Simple.ConsoleApp.csproj b/src/Samples/Sample.Simple.ConsoleApp/Sample.Simple.ConsoleApp.csproj index bf1a5f0e..27646aa4 100644 --- a/src/Samples/Sample.Simple.ConsoleApp/Sample.Simple.ConsoleApp.csproj +++ b/src/Samples/Sample.Simple.ConsoleApp/Sample.Simple.ConsoleApp.csproj @@ -11,6 +11,7 @@ + diff --git a/src/Samples/Sample.Simple.ConsoleApp/appsettings.json b/src/Samples/Sample.Simple.ConsoleApp/appsettings.json index dcce6a12..da90185f 100644 --- a/src/Samples/Sample.Simple.ConsoleApp/appsettings.json +++ b/src/Samples/Sample.Simple.ConsoleApp/appsettings.json @@ -22,5 +22,9 @@ }, "Redis": { "ConnectionString": "{{redis_connectionstring}}" + }, + "Amazon": { + "AccessKey": "{{amazon_access_key}}", + "SecretAccessKey": "{{amazon_secret_access_key}}" } } diff --git a/src/SlimMessageBus.Host.AmazonSQS/ClientFactory/ISqsClientProvider.cs b/src/SlimMessageBus.Host.AmazonSQS/ClientFactory/ISqsClientProvider.cs new file mode 100644 index 00000000..73b4da01 --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/ClientFactory/ISqsClientProvider.cs @@ -0,0 +1,9 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public interface ISqsClientProvider +{ + AmazonSQSClient Client { get; } + Task EnsureClientAuthenticated(); +} + + diff --git a/src/SlimMessageBus.Host.AmazonSQS/ClientFactory/StaticCredentialsSqsClientProvider.cs b/src/SlimMessageBus.Host.AmazonSQS/ClientFactory/StaticCredentialsSqsClientProvider.cs new file mode 100644 index 00000000..5aa259ce --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/ClientFactory/StaticCredentialsSqsClientProvider.cs @@ -0,0 +1,45 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public class StaticCredentialsSqsClientProvider : ISqsClientProvider, IDisposable +{ + private bool _disposedValue; + + private readonly AmazonSQSClient _client; + + public StaticCredentialsSqsClientProvider(AmazonSQSConfig sqsConfig, AWSCredentials credentials) + => _client = new AmazonSQSClient(credentials, sqsConfig); + + #region ISqsClientProvider + + public AmazonSQSClient Client => _client; + + public Task EnsureClientAuthenticated() => Task.CompletedTask; + + #endregion + + #region Dispose Pattern + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + _client?.Dispose(); + } + + _disposedValue = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + #endregion +} + + diff --git a/src/SlimMessageBus.Host.AmazonSQS/ClientFactory/TemporaryCredentialsSqsClientProvider.cs b/src/SlimMessageBus.Host.AmazonSQS/ClientFactory/TemporaryCredentialsSqsClientProvider.cs new file mode 100644 index 00000000..04890d90 --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/ClientFactory/TemporaryCredentialsSqsClientProvider.cs @@ -0,0 +1,102 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +using Amazon.SecurityToken; +using Amazon.SecurityToken.Model; + +public class TemporaryCredentialsSqsClientProvider : ISqsClientProvider, IDisposable +{ + private bool _disposedValue; + + private readonly AmazonSQSConfig _sqsConfig; + private readonly string _roleArn; + private readonly string _roleSessionName; + + private readonly AmazonSecurityTokenServiceClient _stsClient; + private readonly Timer _timer; + private readonly SemaphoreSlim _semaphoreSlim = new(1, 1); + + private AmazonSQSClient _client; + private DateTime _clientCredentialsExpiry; + + public TemporaryCredentialsSqsClientProvider(AmazonSQSConfig sqsConfig, string roleArn, string roleSessionName) + { + _stsClient = new AmazonSecurityTokenServiceClient(); + _sqsConfig = sqsConfig; + _roleArn = roleArn; + _roleSessionName = roleSessionName; + _timer = new Timer(state => _ = EnsureClientAuthenticated(), null, TimeSpan.Zero, TimeSpan.FromMinutes(1)); + } + + #region ISqsClientProvider + + public AmazonSQSClient Client => _client; + + public async Task EnsureClientAuthenticated() + { + if (_client == null || DateTime.UtcNow >= _clientCredentialsExpiry) + { + await _semaphoreSlim.WaitAsync(); + try + { + var oldClient = _client; + (_client, _clientCredentialsExpiry) = await RefreshCredentialsAsync(); + oldClient?.Dispose(); + } + finally + { + _semaphoreSlim.Release(); + } + } + } + + #endregion + + private async Task<(AmazonSQSClient Client, DateTime ClientExpiry)> RefreshCredentialsAsync() + { + var assumeRoleRequest = new AssumeRoleRequest + { + RoleArn = _roleArn, + RoleSessionName = _roleSessionName + }; + + var assumeRoleResponse = await _stsClient.AssumeRoleAsync(assumeRoleRequest); + + var temporaryCredentials = new SessionAWSCredentials( + assumeRoleResponse.Credentials.AccessKeyId, + assumeRoleResponse.Credentials.SecretAccessKey, + assumeRoleResponse.Credentials.SessionToken + ); + + var clientCredentialsExpiry = assumeRoleResponse.Credentials.Expiration.AddMinutes(-5); // Renew 5 mins before expiry + + var client = new AmazonSQSClient(temporaryCredentials, _sqsConfig); + return (client, clientCredentialsExpiry); + } + + #region Dispose Pattern + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + _client?.Dispose(); + _stsClient?.Dispose(); + _timer?.Dispose(); + } + _disposedValue = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + #endregion +} + + diff --git a/src/SlimMessageBus.Host.AmazonSQS/Config/Delegates.cs b/src/SlimMessageBus.Host.AmazonSQS/Config/Delegates.cs new file mode 100644 index 00000000..76abe51e --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/Config/Delegates.cs @@ -0,0 +1,19 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +/// +/// Allows to convert an message and headers into a Message Group Id (used for FIFO queues to group messages together and ensure order of processing). +/// +/// +/// +/// +/// +public delegate string MessageGroupIdProvider(T message, IDictionary headers); + +/// +/// Allows to convert an message and headers into a Message Deduplication Id (Amazon SQS performs deduplication within a 5-minute window). +/// +/// +/// +/// +/// +public delegate string MessageDeduplicationIdProvider(T message, IDictionary headers); diff --git a/src/SlimMessageBus.Host.AmazonSQS/Config/SqsConsumerBuilderExtensions.cs b/src/SlimMessageBus.Host.AmazonSQS/Config/SqsConsumerBuilderExtensions.cs new file mode 100644 index 00000000..427a19ff --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/Config/SqsConsumerBuilderExtensions.cs @@ -0,0 +1,69 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public static class SqsConsumerBuilderExtensions +{ + public static TConsumerBuilder Queue(this TConsumerBuilder consumerBuilder, string queue) + where TConsumerBuilder : AbstractConsumerBuilder + { + if (consumerBuilder is null) throw new ArgumentNullException(nameof(consumerBuilder)); + if (queue is null) throw new ArgumentNullException(nameof(queue)); + + consumerBuilder.ConsumerSettings.PathKind = PathKind.Queue; + consumerBuilder.ConsumerSettings.Path = queue; + return consumerBuilder; + } + + /// + /// Specifies the visibility timeout for the message. Default is 30 seconds. + /// for more information. + /// + /// + /// + /// + /// + /// + /// + public static ConsumerBuilder VisibilityTimeout(this ConsumerBuilder consumerBuilder, int visibilityTimeoutSeconds) + { + if (consumerBuilder is null) throw new ArgumentNullException(nameof(consumerBuilder)); + if (visibilityTimeoutSeconds <= 0) throw new ArgumentOutOfRangeException(nameof(visibilityTimeoutSeconds)); + + SqsProperties.VisibilityTimeout.Set(consumerBuilder.Settings, visibilityTimeoutSeconds); + return consumerBuilder; + } + + /// + /// Specifies the maximum number of messages to receive in a single poll. Default is 1, maximum is 10. + /// for more information. + /// + /// + /// + /// + /// + /// + /// + public static ConsumerBuilder MaxMessages(this ConsumerBuilder consumerBuilder, int maxMessages) + { + if (consumerBuilder is null) throw new ArgumentNullException(nameof(consumerBuilder)); + if (maxMessages <= 0 || maxMessages > 10) throw new ArgumentOutOfRangeException(nameof(maxMessages)); + + SqsProperties.MaxMessages.Set(consumerBuilder.Settings, maxMessages); + return consumerBuilder; + } + + /// + /// Specifies the message attribute names to fetch. Default is "All". + /// + /// + /// + /// + /// + /// + public static ConsumerBuilder FetchMessageAttributes(this ConsumerBuilder consumerBuilder, params string[] messageAttributeNames) + { + if (consumerBuilder is null) throw new ArgumentNullException(nameof(consumerBuilder)); + + SqsProperties.MessageAttributes.Set(consumerBuilder.Settings, messageAttributeNames); + return consumerBuilder; + } +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AmazonSQS/Config/SqsProducerBuilderExtensions.cs b/src/SlimMessageBus.Host.AmazonSQS/Config/SqsProducerBuilderExtensions.cs new file mode 100644 index 00000000..7ea2107a --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/Config/SqsProducerBuilderExtensions.cs @@ -0,0 +1,130 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public static class SqsProducerBuilderExtensions +{ + public static ProducerBuilder DefaultQueue(this ProducerBuilder producerBuilder, string queue) + { + if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder)); + if (queue is null) throw new ArgumentNullException(nameof(queue)); + + producerBuilder.ToQueue(); + return producerBuilder.DefaultPath(queue); + } + + /// + /// The path parameter name in should be treated as a SNS topic name + /// + /// + /// + /// + public static ProducerBuilder ToTopic(this ProducerBuilder producerBuilder) + { + if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder)); + + producerBuilder.Settings.PathKind = PathKind.Topic; + return producerBuilder; + } + + /// + /// The path parameter name in should be treated as a SQS queue name + /// + /// + /// + /// + public static ProducerBuilder ToQueue(this ProducerBuilder producerBuilder) + { + if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder)); + + producerBuilder.Settings.PathKind = PathKind.Queue; + return producerBuilder; + } + + /// + /// Enables FIFO support for the queue when it will be provisioned. + /// + /// + /// + /// + /// + public static ProducerBuilder EnableFifo(this ProducerBuilder producerBuilder, Action> builder = null) + { + if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder)); + + SqsProperties.EnableFifo.Set(producerBuilder.Settings, true); + + builder?.Invoke(new SqsProducerFifoBuilder(producerBuilder.Settings)); + + return producerBuilder; + } + + /// + /// Sets the tags for the queue when it will be provisioned. + /// + /// + /// + /// + /// + /// + public static ProducerBuilder Tags(this ProducerBuilder producerBuilder, Dictionary tags) + { + if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder)); + if (tags is null) throw new ArgumentNullException(nameof(tags)); + + SqsProperties.Tags.Set(producerBuilder.Settings, tags); + + return producerBuilder; + } + + /// + /// Sets the attributes for the queue when it will be provisioned. See the available names in . + /// + /// + /// + /// + /// + /// + public static ProducerBuilder Attributes(this ProducerBuilder producerBuilder, Dictionary attributes) + { + if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder)); + if (attributes is null) throw new ArgumentNullException(nameof(attributes)); + + SqsProperties.Attributes.Set(producerBuilder.Settings, attributes); + + return producerBuilder; + } + + /// + /// Sets the for the queue when it will be provisioned. + /// + /// + /// + /// + /// + /// + public static ProducerBuilder VisibilityTimeout(this ProducerBuilder producerBuilder, int visibilityTimeoutSeconds) + { + if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder)); + + SqsProperties.VisibilityTimeout.Set(producerBuilder.Settings, visibilityTimeoutSeconds); + + return producerBuilder; + } + + /// + /// Sets the for the queue when it will be provisioned. + /// + /// + /// + /// + /// + /// + public static ProducerBuilder Policy(this ProducerBuilder producerBuilder, string policy) + { + if (producerBuilder is null) throw new ArgumentNullException(nameof(producerBuilder)); + if (policy is null) throw new ArgumentNullException(nameof(policy)); + + SqsProperties.Policy.Set(producerBuilder.Settings, policy); + + return producerBuilder; + } +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AmazonSQS/Config/SqsProducerFifoBuilder.cs b/src/SlimMessageBus.Host.AmazonSQS/Config/SqsProducerFifoBuilder.cs new file mode 100644 index 00000000..ae6f40f4 --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/Config/SqsProducerFifoBuilder.cs @@ -0,0 +1,34 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public class SqsProducerFifoBuilder(ProducerSettings producerSettings) +{ + /// + /// Used for FIFO queues to provide a message group id in order to group messages together and ensure order of processing. + /// + /// + /// + /// + /// + public SqsProducerFifoBuilder GroupId(MessageGroupIdProvider provider) + { + if (provider is null) throw new ArgumentNullException(nameof(provider)); + + SqsProperties.MessageGroupId.Set(producerSettings, (message, headers) => provider((T)message, headers)); + return this; + } + + /// + /// Used to specfiy a message deduplication id for the message. This is used to prevent duplicate messages from being sent (Amazon SQS performs deduplication within a 5-minute window). + /// + /// + /// + /// + /// + public SqsProducerFifoBuilder DeduplicationId(MessageDeduplicationIdProvider provider) + { + if (provider is null) throw new ArgumentNullException(nameof(provider)); + + SqsProperties.MessageDeduplicationId.Set(producerSettings, (message, headers) => provider((T)message, headers)); + return this; + } +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AmazonSQS/Config/SqsProperties.cs b/src/SlimMessageBus.Host.AmazonSQS/Config/SqsProperties.cs new file mode 100644 index 00000000..58a1273a --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/Config/SqsProperties.cs @@ -0,0 +1,18 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +static internal class SqsProperties +{ + // producer + static readonly internal ProviderExtensionProperty EnableFifo = new("Sqs_EnableFifo"); + static readonly internal ProviderExtensionProperty> MessageGroupId = new("Sqs_MessageGroupId"); + static readonly internal ProviderExtensionProperty> MessageDeduplicationId = new("Sqs_MessageDeduplicationId"); + static readonly internal ProviderExtensionProperty> Tags = new("Sqs_Tags"); + static readonly internal ProviderExtensionProperty> Attributes = new("Sqs_Attributes"); + static readonly internal ProviderExtensionProperty Policy = new("Sqs_Policy"); + + // consumer + static readonly internal ProviderExtensionProperty MaxMessages = new("Sqs_MaxMessages"); + static readonly internal ProviderExtensionProperty VisibilityTimeout = new("Sqs_VisibilityTimeout"); + static readonly internal ProviderExtensionProperty WaitTimeSeconds = new("Sqs_WaitTimeSeconds"); + static readonly internal ProviderExtensionProperty MessageAttributes = new("Sqs_MessageAttributes"); +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AmazonSQS/Config/SqsRequestResponseBuilderExtensions.cs b/src/SlimMessageBus.Host.AmazonSQS/Config/SqsRequestResponseBuilderExtensions.cs new file mode 100644 index 00000000..3f6fd71a --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/Config/SqsRequestResponseBuilderExtensions.cs @@ -0,0 +1,17 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public static class SqsRequestResponseBuilderExtensions +{ + public static RequestResponseBuilder ReplyToQueue(this RequestResponseBuilder builder, string queue, Action builderConfig = null) + { + if (builder is null) throw new ArgumentNullException(nameof(builder)); + if (queue is null) throw new ArgumentNullException(nameof(queue)); + + builder.Settings.Path = queue; + builder.Settings.PathKind = PathKind.Queue; + + builderConfig?.Invoke(builder); + + return builder; + } +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs b/src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs new file mode 100644 index 00000000..6172cfb5 --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs @@ -0,0 +1,5 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public interface ISqsConsumerErrorHandler : IConsumerErrorHandler +{ +} diff --git a/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs new file mode 100644 index 00000000..95818eb7 --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs @@ -0,0 +1,145 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public abstract class SqsBaseConsumer : AbstractConsumer +{ + private readonly ISqsClientProvider _clientProvider; + + // consumer settings + private readonly int _maxMessages; + private readonly int _visibilityTimeout; + private readonly List _messageAttributeNames; + + private Task _task; + + public SqsMessageBus MessageBus { get; } + protected IMessageProcessor MessageProcessor { get; } + protected string Path { get; } + protected ISqsHeaderSerializer HeaderSerializer { get; } + + protected SqsBaseConsumer( + SqsMessageBus messageBus, + ISqsClientProvider clientProvider, + string path, + IMessageProcessor messageProcessor, + IEnumerable consumerSettings, + ILogger logger) + : base(logger ?? throw new ArgumentNullException(nameof(logger))) + { + MessageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus)); + _clientProvider = clientProvider ?? throw new ArgumentNullException(nameof(clientProvider)); + Path = path ?? throw new ArgumentNullException(nameof(path)); + MessageProcessor = messageProcessor ?? throw new ArgumentNullException(nameof(messageProcessor)); + HeaderSerializer = messageBus.HeaderSerializer; + T GetSingleValue(Func selector, string settingName, T defaultValue = default) + { + var set = consumerSettings.Select(x => selector(x)).Where(x => x is not null && !x.Equals(defaultValue)).ToHashSet(); + if (set.Count > 1) + { + throw new ConfigurationMessageBusException($"All declared consumers across the same queue {path} must have the same {settingName} settings."); + } + return set.FirstOrDefault() ?? defaultValue; + } + + _maxMessages = GetSingleValue(x => x.GetOrDefault(SqsProperties.MaxMessages), nameof(SqsConsumerBuilderExtensions.MaxMessages)) ?? messageBus.ProviderSettings.MaxMessageCount; + _visibilityTimeout = GetSingleValue(x => x.GetOrDefault(SqsProperties.VisibilityTimeout), nameof(SqsConsumerBuilderExtensions.VisibilityTimeout)) ?? 30; + _messageAttributeNames = new List(GetSingleValue(x => x.GetOrDefault(SqsProperties.MessageAttributes), nameof(SqsConsumerBuilderExtensions.FetchMessageAttributes)) ?? ["All"]); + } + + private async Task> ReceiveMessagesByUrl(string queueUrl) + { + var messageResponse = await _clientProvider.Client.ReceiveMessageAsync(new ReceiveMessageRequest + { + QueueUrl = queueUrl, + MessageAttributeNames = _messageAttributeNames, + MaxNumberOfMessages = _maxMessages, + VisibilityTimeout = _visibilityTimeout, + // For information about long polling, see + // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html + // Setting WaitTimeSeconds to non-zero enables long polling. + WaitTimeSeconds = 5 + }, CancellationToken); + + return messageResponse.Messages; + } + + private async Task DeleteMessageBatchByUrl(string queueUrl, IReadOnlyCollection messages) + { + var deleteRequest = new DeleteMessageBatchRequest + { + QueueUrl = queueUrl, + Entries = new List(messages.Count) + }; + foreach (var message in messages) + { + deleteRequest.Entries.Add(new DeleteMessageBatchRequestEntry + { + Id = message.MessageId, + ReceiptHandle = message.ReceiptHandle + }); + } + + var deleteResponse = await _clientProvider.Client.DeleteMessageBatchAsync(deleteRequest, CancellationToken); + + // ToDo: capture failed messages + return deleteResponse.Failed.Count > 0; + } + + protected override Task OnStart() + { + Logger.LogInformation("Starting consumer for Queue: {Queue}", Path); + _task = Run(); + return Task.CompletedTask; + } + + protected override async Task OnStop() + { + Logger.LogInformation("Stopping consumer for Queue: {Queue}", Path); + await _task.ConfigureAwait(false); + _task = null; + } + + protected async Task Run() + { + var queueUrl = MessageBus.GetQueueUrlOrException(Path); + + var messagesToDelete = new List(_maxMessages); + + while (!CancellationToken.IsCancellationRequested) + { + try + { + var messages = await ReceiveMessagesByUrl(queueUrl).ConfigureAwait(false); + foreach (var message in messages) + { + var messageHeaders = message + .MessageAttributes + .ToDictionary(x => x.Key, x => HeaderSerializer.Deserialize(x.Key, x.Value)); + + var r = await MessageProcessor.ProcessMessage(message, messageHeaders, cancellationToken: CancellationToken).ConfigureAwait(false); + if (r.Exception != null) + { + Logger.LogError(r.Exception, "Message processing error - Queue: {Queue}, MessageId: {MessageId}", Path, message.MessageId); + // ToDo: DLQ handling + break; + } + messagesToDelete.Add(message); + } + + if (messagesToDelete.Count > 0) + { + await DeleteMessageBatchByUrl(queueUrl, messagesToDelete).ConfigureAwait(false); + messagesToDelete.Clear(); + } + } + catch (TaskCanceledException) + { + // ignore, need to finish + } + catch (Exception ex) + { + Logger.LogError(ex, "Error while processing messages - Queue: {Queue}", Path); + await Task.Delay(2000, CancellationToken).ConfigureAwait(false); + } + } + } +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsConsumerContextExtensions.cs b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsConsumerContextExtensions.cs new file mode 100644 index 00000000..9587ff9b --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsConsumerContextExtensions.cs @@ -0,0 +1,20 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public static class SqsConsumerContextExtensions +{ + private const string MessageKey = "Sqs_Message"; + + public static Message GetTransportMessage(this IConsumerContext context) + { + if (context is null) throw new ArgumentNullException(nameof(context)); + + return context.GetPropertyOrDefault(MessageKey); + } + + internal static void SetTransportMessage(this ConsumerContext context, Message message) + { + if (context is null) throw new ArgumentNullException(nameof(context)); + + context.Properties[MessageKey] = message; + } +} diff --git a/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsQueueConsumer.cs b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsQueueConsumer.cs new file mode 100644 index 00000000..16a35d38 --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsQueueConsumer.cs @@ -0,0 +1,16 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public class SqsQueueConsumer( + SqsMessageBus messageBus, + string path, + ISqsClientProvider clientProvider, + IMessageProcessor messageProcessor, + IEnumerable consumerSettings) + : SqsBaseConsumer(messageBus, + clientProvider, + path, + messageProcessor, + consumerSettings, + messageBus.LoggerFactory.CreateLogger()) +{ +} diff --git a/src/SlimMessageBus.Host.AmazonSQS/GlobalUsings.cs b/src/SlimMessageBus.Host.AmazonSQS/GlobalUsings.cs new file mode 100644 index 00000000..ea8ffa7a --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/GlobalUsings.cs @@ -0,0 +1,10 @@ +global using Amazon; +global using Amazon.Runtime; +global using Amazon.SQS; +global using Amazon.SQS.Model; + +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.DependencyInjection.Extensions; +global using Microsoft.Extensions.Logging; + +global using SlimMessageBus.Host.Serialization; diff --git a/src/SlimMessageBus.Host.AmazonSQS/Headers/DefaultSqsHeaderSerializer.cs b/src/SlimMessageBus.Host.AmazonSQS/Headers/DefaultSqsHeaderSerializer.cs new file mode 100644 index 00000000..a6a5f6f4 --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/Headers/DefaultSqsHeaderSerializer.cs @@ -0,0 +1,32 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public class DefaultSqsHeaderSerializer(bool detectStringType = true) : ISqsHeaderSerializer +{ + const string DataTypeNumber = "Number"; + const string DataTypeString = "String"; + + public MessageAttributeValue Serialize(string key, object value) => value switch + { + // See more https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes + var x when x is long || x is int || x is short || x is byte => new MessageAttributeValue + { + DataType = DataTypeNumber, + StringValue = value.ToString() + }, + _ => new MessageAttributeValue + { + DataType = DataTypeString, + StringValue = value?.ToString() + } + }; + + public object Deserialize(string key, MessageAttributeValue value) => value.DataType switch + { + DataTypeNumber when long.TryParse(value.StringValue, out var longValue) => longValue, + DataTypeString when detectStringType && key != ReqRespMessageHeaders.RequestId && Guid.TryParse(value.StringValue, out var guid) => guid, + DataTypeString when detectStringType && bool.TryParse(value.StringValue, out var b) => b, + DataTypeString when detectStringType && DateTime.TryParse(value.StringValue, out var dt) => dt, + DataTypeString => value.StringValue, + _ => null + }; +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AmazonSQS/Headers/ISqsHeaderSerializer.cs b/src/SlimMessageBus.Host.AmazonSQS/Headers/ISqsHeaderSerializer.cs new file mode 100644 index 00000000..026f3106 --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/Headers/ISqsHeaderSerializer.cs @@ -0,0 +1,7 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public interface ISqsHeaderSerializer +{ + MessageAttributeValue Serialize(string key, object value); + object Deserialize(string key, MessageAttributeValue value); +} diff --git a/src/SlimMessageBus.Host.AmazonSQS/MessageBusBuilderExtensions.cs b/src/SlimMessageBus.Host.AmazonSQS/MessageBusBuilderExtensions.cs new file mode 100644 index 00000000..73ccaccd --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/MessageBusBuilderExtensions.cs @@ -0,0 +1,20 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public static class MessageBusBuilderExtensions +{ + public static MessageBusBuilder WithProviderAmazonSQS(this MessageBusBuilder mbb, Action configure) + { + if (mbb is null) throw new ArgumentNullException(nameof(mbb)); + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + var providerSettings = new SqsMessageBusSettings(); + configure(providerSettings); + + mbb.PostConfigurationActions.Add((services) => + { + services.TryAddSingleton(providerSettings.ClientProviderFactory); + }); + + return mbb.WithProvider(settings => new SqsMessageBus(settings, providerSettings)); + } +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AmazonSQS/SlimMessageBus.Host.AmazonSQS.csproj b/src/SlimMessageBus.Host.AmazonSQS/SlimMessageBus.Host.AmazonSQS.csproj new file mode 100644 index 00000000..6c0f4335 --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/SlimMessageBus.Host.AmazonSQS.csproj @@ -0,0 +1,23 @@ + + + + + + Amazon SQS provider for SlimMessageBus + See https://github.com/zarusz/SlimMessageBus/releases + Amazon AWS SQS provider SlimMessageBus MessageBus bus facade messaging + icon.png + latest + + + + + + + + + + + + + diff --git a/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs b/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs new file mode 100644 index 00000000..6ff977df --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs @@ -0,0 +1,238 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +using SlimMessageBus.Host.Serialization; + +public class SqsMessageBus : MessageBusBase +{ + private readonly ILogger _logger; + private readonly ISqsClientProvider _clientProvider; + private readonly Dictionary _queueUrlByPath = []; + private readonly IMessageSerializer _messageSerializer; + + public ISqsHeaderSerializer HeaderSerializer { get; } + + public SqsMessageBus(MessageBusSettings settings, SqsMessageBusSettings providerSettings) + : base(settings, providerSettings) + { + _logger = LoggerFactory.CreateLogger(); + _clientProvider = settings.ServiceProvider.GetRequiredService(); + _messageSerializer = Serializer as IMessageSerializer + ?? throw new ConfigurationMessageBusException($"Serializer for Amazon SQS must be able to serialize into a string (it needs to implement {nameof(IMessageSerializer)})"); + HeaderSerializer = providerSettings.SqsHeaderSerializer; + OnBuildProvider(); + } + + protected override void Build() + { + base.Build(); + InitTaskList.Add(InitAsync, CancellationToken); + } + + protected override async Task CreateConsumers() + { + await base.CreateConsumers(); + + void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor messageProcessor, IEnumerable consumerSettings) + { + if (pathKind == PathKind.Queue) + { + _logger.LogInformation("Creating consumer for Queue: {Queue}", path); + var consumer = new SqsQueueConsumer(this, path, _clientProvider, messageProcessor, consumerSettings); + AddConsumer(consumer); + } + } + + static void InitConsumerContext(Message m, ConsumerContext ctx) => ctx.SetTransportMessage(m); + + object MessageProvider(Type messageType, Message transportMessage) => _messageSerializer.Deserialize(messageType, transportMessage.Body); + + foreach (var ((path, pathKind), consumerSettings) in Settings.Consumers + .GroupBy(x => (x.Path, x.PathKind)) + .ToDictionary(x => x.Key, x => x.ToList())) + { + var messageProcessor = new MessageProcessor( + consumerSettings, + this, + messageProvider: MessageProvider, + path: path, + responseProducer: this, + consumerContextInitializer: InitConsumerContext, + consumerErrorHandlerOpenGenericType: typeof(ISqsConsumerErrorHandler<>)); + + AddConsumerFrom(path, pathKind, messageProcessor, consumerSettings); + } + + if (Settings.RequestResponse != null) + { + var messageProcessor = new ResponseMessageProcessor( + LoggerFactory, + Settings.RequestResponse, + messageProvider: MessageProvider, + PendingRequestStore, + CurrentTimeProvider); + + AddConsumerFrom( + Settings.RequestResponse.Path, + Settings.RequestResponse.PathKind, + messageProcessor, + [Settings.RequestResponse]); + } + } + + /// + /// Performs initialization that has to happen before the first message produce happens. + /// + /// + private async Task InitAsync() + { + try + { + _logger.LogInformation("Ensuring client is authenticate"); + // Ensure the client finished the first authentication + await _clientProvider.EnsureClientAuthenticated(); + + // Provision the topology if enabled + if (ProviderSettings.TopologyProvisioning?.Enabled ?? false) + { + _logger.LogInformation("Provisioning topology"); + await ProvisionTopology(); + } + + // Read the Queue/Topic URLs for the producers + _logger.LogInformation("Populating queue URLs"); + await PopulatePathToUrlMappings(); + } + catch (Exception ex) + { + _logger.LogError(ex, "SQS Transport initialization failed: {ErrorMessage}", ex.Message); + } + } + + public override async Task ProvisionTopology() + { + await base.ProvisionTopology(); + + var provisioningService = new SqsTopologyService(LoggerFactory.CreateLogger(), Settings, ProviderSettings, _clientProvider); + await provisioningService.ProvisionTopology(); // provisioning happens asynchronously + } + + private async Task PopulatePathToUrlMappings() + { + var queuePaths = Settings.Producers.Where(x => x.PathKind == PathKind.Queue).Select(x => x.DefaultPath) + .Concat(Settings.Consumers.Where(x => x.PathKind == PathKind.Queue).Select(x => x.Path)) + .Concat(Settings.RequestResponse?.PathKind == PathKind.Queue ? [Settings.RequestResponse.Path] : []) + .ToHashSet(); + + foreach (var queuePath in queuePaths) + { + try + { + _logger.LogDebug("Populating URL for queue {QueueName}", queuePath); + var queueResponse = await _clientProvider.Client.GetQueueUrlAsync(queuePath, CancellationToken); + _queueUrlByPath[queuePath] = queueResponse.QueueUrl; + } + catch (QueueDoesNotExistException ex) + { + _logger.LogError(ex, "Queue {QueueName} does not exist, ensure that it either exists or topology provisioning is enabled", queuePath); + } + } + } + internal string GetQueueUrlOrException(string path) + { + if (_queueUrlByPath.TryGetValue(path, out var queueUrl)) + { + return queueUrl; + } + throw new ProducerMessageBusException($"Queue {path} has unknown URL at this point. Ensure the queue exists in Amazon SQS."); + } + + public override async Task ProduceToTransport(object message, Type messageType, string path, IDictionary messageHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken) + { + OnProduceToTransport(message, messageType, path, messageHeaders); + + var queueUrl = GetQueueUrlOrException(path); + try + { + var (payload, attributes, deduplicationId, groupId) = GetTransportMessage(message, messageType, messageHeaders); + + await _clientProvider.Client.SendMessageAsync(new SendMessageRequest(queueUrl, payload) + { + MessageAttributes = attributes, + MessageDeduplicationId = deduplicationId, + MessageGroupId = groupId + }, cancellationToken); + } + catch (Exception ex) when (ex is not ProducerMessageBusException && ex is not TaskCanceledException) + { + throw new ProducerMessageBusException(GetProducerErrorMessage(path, message, messageType, ex), ex); + } + } + + // Chunk if exceeds 10 messages and payload size (Amazon SQS limits) + private const int MaxMessagesInBatch = 10; + + public override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) + { + var dispatched = new List(envelopes.Count); + try + { + var queueUrl = GetQueueUrlOrException(path); + + var entries = new List(MaxMessagesInBatch); + + var envelopeChunks = envelopes.Chunk(MaxMessagesInBatch); + foreach (var envelopeChunk in envelopeChunks) + { + foreach (var envelope in envelopeChunk) + { + var (payload, attributes, deduplicationId, groupId) = GetTransportMessage(envelope.Message, envelope.MessageType, envelope.Headers); + + entries.Add(new SendMessageBatchRequestEntry(Guid.NewGuid().ToString(), payload) + { + MessageAttributes = attributes, + MessageDeduplicationId = deduplicationId, + MessageGroupId = groupId + }); + } + + await _clientProvider.Client.SendMessageBatchAsync(new SendMessageBatchRequest(queueUrl, entries), cancellationToken); + + entries.Clear(); + + dispatched.AddRange(envelopeChunk); + } + + return new(dispatched, null); + } + catch (Exception ex) + { + _logger.LogError(ex, "Producing message batch to path {Path} resulted in error {Error}", path, ex.Message); + return new(dispatched, ex); + } + } + + private (string Payload, Dictionary Attributes, string DeduplicationId, string GroupId) GetTransportMessage(object message, Type messageType, IDictionary messageHeaders) + { + var producerSettings = GetProducerSettings(messageType); + + var messageDeduplicationIdProvider = producerSettings.GetOrDefault(SqsProperties.MessageDeduplicationId, null); + var deduplicationId = messageDeduplicationIdProvider?.Invoke(message, messageHeaders); + + var messageGroupIdProvider = producerSettings.GetOrDefault(SqsProperties.MessageGroupId, null); + var groupId = messageGroupIdProvider?.Invoke(message, messageHeaders); + + Dictionary messageAttributes = null; + if (messageHeaders != null) + { + messageAttributes = []; + foreach (var header in messageHeaders) + { + var headerValue = HeaderSerializer.Serialize(header.Key, header.Value); + messageAttributes.Add(header.Key, headerValue); + } + } + + var messagePayload = _messageSerializer.Serialize(messageType, message); + return (messagePayload, messageAttributes, deduplicationId, groupId); + } +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs b/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs new file mode 100644 index 00000000..8783e48e --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs @@ -0,0 +1,67 @@ +namespace SlimMessageBus.Host.AmazonSQS; +public class SqsMessageBusSettings +{ + /// + /// The factory method to create the client provider which is used to manage the . + /// + public Func ClientProviderFactory { get; set; } + + /// + /// The configuration for the SQS client. + /// + public AmazonSQSConfig SqsClientConfig { get; set; } = new(); + + /// + /// Serializer used to serialize SQS message header values. + /// By default the is used. + /// + public ISqsHeaderSerializer SqsHeaderSerializer { get; set; } = new DefaultSqsHeaderSerializer(); + + /// + /// Settings for auto creation of queues if they don't exist. + /// + public SqsTopologySettings TopologyProvisioning { get; set; } = new(); + + /// + /// Connect to AWS using long term credentials. + /// See https://docs.aws.amazon.com/sdkref/latest/guide/access-iam-users.html + /// + /// + /// + /// + public SqsMessageBusSettings UseCredentials(string accessKey, string secretKey) + { + ClientProviderFactory = (svp) => new StaticCredentialsSqsClientProvider(SqsClientConfig, new BasicAWSCredentials(accessKey, secretKey)); + return this; + } + + /// + /// Connect to AWS using temporary credentials (recommended) + /// See https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html#RequestWithSTS + /// + /// + /// + /// + public SqsMessageBusSettings UseTemporaryCredentials(string roleArn, string roleSessionName) + { + ClientProviderFactory = (svp) => new TemporaryCredentialsSqsClientProvider(SqsClientConfig, roleArn, roleSessionName); + return this; + } + + /// + /// Sets the region for the SQS client. + /// + /// + /// + public SqsMessageBusSettings UseRegion(RegionEndpoint region) + { + SqsClientConfig.RegionEndpoint = region; + return this; + } + + /// + /// Maximum message count to be recieved by the consumer in one batch (1-10). Default is 10. + /// + public int MaxMessageCount { get; set; } = 10; + +} diff --git a/src/SlimMessageBus.Host.AmazonSQS/SqsTopologyService.cs b/src/SlimMessageBus.Host.AmazonSQS/SqsTopologyService.cs new file mode 100644 index 00000000..b71b4654 --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/SqsTopologyService.cs @@ -0,0 +1,160 @@ +namespace SlimMessageBus.Host.AmazonSQS; +public class SqsTopologyService +{ + private readonly ILogger _logger; + private readonly MessageBusSettings _settings; + private readonly SqsMessageBusSettings _providerSettings; + private readonly ISqsClientProvider _clientProvider; + + public SqsTopologyService( + ILogger logger, + MessageBusSettings settings, + SqsMessageBusSettings providerSettings, + ISqsClientProvider clientProvider) + { + _logger = logger; + _settings = settings; + _providerSettings = providerSettings; + _clientProvider = clientProvider; + } + + public Task ProvisionTopology() => _providerSettings.TopologyProvisioning.OnProvisionTopology(_clientProvider.Client, DoProvisionTopology); + + private async Task CreateQueue(string path, + bool fifo, + int? visibilityTimeout, + string policy, + Dictionary attributes, + Dictionary tags) + { + try + { + try + { + var queueUrl = await _clientProvider.Client.GetQueueUrlAsync(path); + if (queueUrl != null) + { + return; + } + } + catch (QueueDoesNotExistException) + { + // proceed to create the queue + } + + var createQueueRequest = new CreateQueueRequest + { + QueueName = path, + Attributes = [] + }; + + if (fifo) + { + createQueueRequest.Attributes.Add(QueueAttributeName.FifoQueue, "true"); + } + + if (visibilityTimeout != null) + { + createQueueRequest.Attributes.Add(QueueAttributeName.VisibilityTimeout, visibilityTimeout.ToString()); + } + + if (policy != null) + { + createQueueRequest.Attributes.Add(QueueAttributeName.Policy, policy); + } + + if (attributes.Count > 0) + { + createQueueRequest.Attributes = attributes; + } + + if (tags.Count > 0) + { + createQueueRequest.Tags = tags; + } + + _providerSettings.TopologyProvisioning.CreateQueueOptions?.Invoke(createQueueRequest); + + try + { + var createQueueResponse = await _clientProvider.Client.CreateQueueAsync(createQueueRequest); + _logger.LogInformation("Created queue {QueueName} with URL {QueueUrl}", path, createQueueResponse.QueueUrl); + } + catch (QueueNameExistsException) + { + _logger.LogInformation("Queue {QueueName} already exists", path); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error creating queue {QueueName}", path); + } + } + + private async Task DoProvisionTopology() + { + try + { + _logger.LogInformation("Topology provisioning started..."); + + if (_providerSettings.TopologyProvisioning.CanConsumerCreateQueue) + { + var consumersSettingsByPath = _settings.Consumers + .OfType() + .Concat([_settings.RequestResponse]) + .Where(x => x != null) + .GroupBy(x => (x.Path, x.PathKind)) + .ToDictionary(x => x.Key, x => x.ToList()); + + foreach (var ((path, pathKind), consumerSettingsList) in consumersSettingsByPath) + { + if (pathKind == PathKind.Queue) + { + await CreateQueue( + path: path, + fifo: consumerSettingsList.Any(cs => cs.GetOrDefault(SqsProperties.EnableFifo, _settings, false)), + visibilityTimeout: consumerSettingsList.Select(cs => cs.GetOrDefault(SqsProperties.VisibilityTimeout, _settings, null)).FirstOrDefault(x => x != null), + policy: consumerSettingsList.Select(cs => cs.GetOrDefault(SqsProperties.Policy, _settings, null)).FirstOrDefault(x => x != null), + attributes: [], + tags: []); + } + } + } + + if (_providerSettings.TopologyProvisioning.CanProducerCreateQueue) + { + foreach (var producer in _settings.Producers) + { + var attributes = producer.GetOrDefault(SqsProperties.Attributes, []) + .Concat(_settings.GetOrDefault(SqsProperties.Attributes, [])) + .GroupBy(x => x.Key, x => x.Value) + .ToDictionary(x => x.Key, x => x.First()); + + var tags = producer.GetOrDefault(SqsProperties.Tags, []) + .Concat(_settings.GetOrDefault(SqsProperties.Tags, [])) + .GroupBy(x => x.Key, x => x.Value) + .ToDictionary(x => x.Key, x => x.First()); + + if (producer.PathKind == PathKind.Queue) + { + await CreateQueue( + path: producer.DefaultPath, + fifo: producer.GetOrDefault(SqsProperties.EnableFifo, _settings, false), + visibilityTimeout: producer.GetOrDefault(SqsProperties.VisibilityTimeout, _settings, null), + policy: producer.GetOrDefault(SqsProperties.Policy, _settings, null), + attributes: attributes, + tags: tags); + } + } + } + } + catch (Exception e) + { + _logger.LogError(e, "Could not provision Amazon SQS topology"); + } + finally + { + _logger.LogInformation("Topology provisioning finished"); + } + } +} diff --git a/src/SlimMessageBus.Host.AmazonSQS/SqsTopologySettings.cs b/src/SlimMessageBus.Host.AmazonSQS/SqsTopologySettings.cs new file mode 100644 index 00000000..d4dc9aab --- /dev/null +++ b/src/SlimMessageBus.Host.AmazonSQS/SqsTopologySettings.cs @@ -0,0 +1,34 @@ +namespace SlimMessageBus.Host.AmazonSQS; + +public class SqsTopologySettings +{ + /// + /// Indicates whether topology provisioning is enabled. Default is true. + /// + public bool Enabled { get; set; } = true; + /// + /// A filter that allows (or not) for declared producers to provision needed queues. True by default. + /// + public bool CanProducerCreateQueue { get; set; } = true; + /// + /// A filter that allows (or not) for declared consumers to provision needed queues. True by default. + /// + public bool CanConsumerCreateQueue { get; set; } = true; + /// + /// Default configuration to be applied when a topic needs to be created (). + /// + public Action CreateQueueOptions { get; set; } + + /// + /// Interceptor that allows to intercept the topology provisioning process. + /// + public SqsTopologyInterceptor OnProvisionTopology { get; set; } = (client, provision) => provision(); +} + +/// +/// Interceptor that allows to intercept the topology provisioning process and to apply custom logic before and after the provisioning process. +/// +/// The SQS client +/// Delegate allowing to perform topology provisioning +/// +public delegate Task SqsTopologyInterceptor(AmazonSQSClient client, Func provision); diff --git a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumerForConsumers.cs b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumerForConsumers.cs index 58d23550..7182ac6c 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumerForConsumers.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumerForConsumers.cs @@ -7,7 +7,7 @@ public class EhPartitionConsumerForConsumers : EhPartitionConsumer { private readonly IEnumerable _consumerSettings; - public EhPartitionConsumerForConsumers(EventHubMessageBus messageBus, IEnumerable consumerSettings, GroupPathPartitionId pathGroupPartition) + public EhPartitionConsumerForConsumers(EventHubMessageBus messageBus, IEnumerable consumerSettings, GroupPathPartitionId pathGroupPartition, MessageProvider messageProvider) : base(messageBus, pathGroupPartition) { _consumerSettings = consumerSettings ?? throw new ArgumentNullException(nameof(consumerSettings)); @@ -16,7 +16,7 @@ public EhPartitionConsumerForConsumers(EventHubMessageBus messageBus, IEnumerabl MessageProcessor = new MessageProcessor( _consumerSettings, MessageBus, - messageProvider: GetMessageFromTransportMessage, + messageProvider: messageProvider, path: GroupPathPartition.ToString(), responseProducer: MessageBus, consumerContextInitializer: InitializeConsumerContext, @@ -30,9 +30,6 @@ protected ICheckpointTrigger CreateCheckpointTrigger() return f.Create(_consumerSettings); } - private object GetMessageFromTransportMessage(Type messageType, EventData e) - => MessageBus.Serializer.Deserialize(messageType, e.Body.ToArray()); - private static void InitializeConsumerContext(EventData nativeMessage, ConsumerContext consumerContext) => consumerContext.SetTransportMessage(nativeMessage); } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumerForResponses.cs b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumerForResponses.cs index 061fcc44..17439275 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumerForResponses.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumerForResponses.cs @@ -9,12 +9,18 @@ namespace SlimMessageBus.Host.AzureEventHub; /// public class EhPartitionConsumerForResponses : EhPartitionConsumer { - public EhPartitionConsumerForResponses(EventHubMessageBus messageBus, RequestResponseSettings requestResponseSettings, GroupPathPartitionId pathGroupPartition) + public EhPartitionConsumerForResponses( + EventHubMessageBus messageBus, + RequestResponseSettings requestResponseSettings, + GroupPathPartitionId pathGroupPartition, + MessageProvider messageProvider, + IPendingRequestStore pendingRequestStore, + ICurrentTimeProvider currentTimeProvider) : base(messageBus, pathGroupPartition) { if (requestResponseSettings == null) throw new ArgumentNullException(nameof(requestResponseSettings)); - MessageProcessor = new ResponseMessageProcessor(MessageBus.LoggerFactory, requestResponseSettings, MessageBus, messagePayloadProvider: eventData => eventData.EventBody.ToArray()); + MessageProcessor = new ResponseMessageProcessor(MessageBus.LoggerFactory, requestResponseSettings, messageProvider, pendingRequestStore, currentTimeProvider); CheckpointTrigger = new CheckpointTrigger(requestResponseSettings, MessageBus.LoggerFactory); } } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs b/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs index d8a79cbe..3aae9ded 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs @@ -55,17 +55,19 @@ protected override async Task CreateConsumers() { await base.CreateConsumers(); + object MessageProvider(Type messageType, EventData transportMessage) => Serializer.Deserialize(messageType, transportMessage.Body.ToArray()); + foreach (var (groupPath, consumerSettings) in Settings.Consumers.GroupBy(x => new GroupPath(path: x.Path, group: x.GetGroup())).ToDictionary(x => x.Key, x => x.ToList())) { _logger.LogInformation("Creating consumer for Path: {Path}, Group: {Group}", groupPath.Path, groupPath.Group); - AddConsumer(new EhGroupConsumer(this, groupPath, groupPathPartition => new EhPartitionConsumerForConsumers(this, consumerSettings, groupPathPartition))); + AddConsumer(new EhGroupConsumer(this, groupPath, groupPathPartition => new EhPartitionConsumerForConsumers(this, consumerSettings, groupPathPartition, MessageProvider))); } if (Settings.RequestResponse != null) { var pathGroup = new GroupPath(Settings.RequestResponse.Path, Settings.RequestResponse.GetGroup()); _logger.LogInformation("Creating response consumer for Path: {Path}, Group: {Group}", pathGroup.Path, pathGroup.Group); - AddConsumer(new EhGroupConsumer(this, pathGroup, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition))); + AddConsumer(new EhGroupConsumer(this, pathGroup, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition, MessageProvider, PendingRequestStore, CurrentTimeProvider))); } } diff --git a/src/SlimMessageBus.Host.AzureEventHub/SlimMessageBus.Host.AzureEventHub.csproj b/src/SlimMessageBus.Host.AzureEventHub/SlimMessageBus.Host.AzureEventHub.csproj index efdd6a9c..2ce27850 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/SlimMessageBus.Host.AzureEventHub.csproj +++ b/src/SlimMessageBus.Host.AzureEventHub/SlimMessageBus.Host.AzureEventHub.csproj @@ -3,9 +3,9 @@ - Azure Event Hubs provider SlimMessageBus MessageBus bus facade messaging + Ak,m Event Hubs provider for SlimMessageBus See https://github.com/zarusz/SlimMessageBus/releases - Azure Event Hubs provider for SlimMessageBus + Azure Event Hubs provider SlimMessageBus MessageBus bus facade messaging icon.png latest @@ -16,7 +16,6 @@ - diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/ServiceBusConsumerContextExtensions.cs b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/ServiceBusConsumerContextExtensions.cs index ac500c35..a3aefb29 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/ServiceBusConsumerContextExtensions.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/ServiceBusConsumerContextExtensions.cs @@ -1,7 +1,5 @@ namespace SlimMessageBus.Host.AzureServiceBus; -using Azure.Messaging.ServiceBus; - public static class ServiceBusConsumerContextExtensions { private const string MessageKey = "ServiceBus_Message"; @@ -13,7 +11,7 @@ public static ServiceBusReceivedMessage GetTransportMessage(this IConsumerContex return context.GetPropertyOrDefault(MessageKey); } - public static void SetTransportMessage(this ConsumerContext context, ServiceBusReceivedMessage message) + internal static void SetTransportMessage(this ConsumerContext context, ServiceBusReceivedMessage message) { if (context is null) throw new ArgumentNullException(nameof(context)); diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs index 156efc4e..cd87ed56 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs @@ -59,7 +59,7 @@ protected override void Build() if (ProviderSettings.TopologyProvisioning?.Enabled ?? false) { - AddInit(ProvisionTopology()); + InitTaskList.Add(ProvisionTopology, CancellationToken); } _client = ProviderSettings.ClientFactory(); @@ -86,6 +86,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso } static void InitConsumerContext(ServiceBusReceivedMessage m, ConsumerContext ctx) => ctx.SetTransportMessage(m); + object MessageProvider(Type messageType, ServiceBusReceivedMessage m) => Serializer.Deserialize(messageType, m.Body.ToArray()); foreach (var ((path, subscriptionName), consumerSettings) in Settings.Consumers .GroupBy(x => (x.Path, SubscriptionName: x.GetSubscriptionName(ProviderSettings))) @@ -95,7 +96,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso var messageProcessor = new MessageProcessor( consumerSettings, this, - messageProvider: (messageType, m) => Serializer.Deserialize(messageType, m.Body.ToArray()), + messageProvider: MessageProvider, path: path.ToString(), responseProducer: this, consumerContextInitializer: InitConsumerContext, @@ -110,8 +111,9 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso var messageProcessor = new ResponseMessageProcessor( LoggerFactory, Settings.RequestResponse, - responseConsumer: this, - messagePayloadProvider: m => m.Body.ToArray()); + MessageProvider, + PendingRequestStore, + CurrentTimeProvider); AddConsumerFrom(topicSubscription, messageProcessor, [Settings.RequestResponse]); } diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs index 0be23396..f687fd0c 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs @@ -164,7 +164,7 @@ protected async Task DoProvisionTopology() var topologyProvisioning = _providerSettings.TopologyProvisioning; var consumersSettingsByPath = _settings.Consumers.OfType() - .Concat(new[] { _settings.RequestResponse }) + .Concat([_settings.RequestResponse]) .Where(x => x != null) .GroupBy(x => (x.Path, x.PathKind)) .ToDictionary(x => x.Key, x => x.ToList()); diff --git a/src/SlimMessageBus.Host.AzureServiceBus/SlimMessageBus.Host.AzureServiceBus.csproj b/src/SlimMessageBus.Host.AzureServiceBus/SlimMessageBus.Host.AzureServiceBus.csproj index 114d18e0..4652b894 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/SlimMessageBus.Host.AzureServiceBus.csproj +++ b/src/SlimMessageBus.Host.AzureServiceBus/SlimMessageBus.Host.AzureServiceBus.csproj @@ -3,11 +3,11 @@ - latest Azure Service Bus provider for SlimMessageBus + See https://github.com/zarusz/SlimMessageBus/releases Azure Service Bus provider SlimMessageBus MessageBus bus facade messaging client icon.png - + latest @@ -19,9 +19,7 @@ - - <_Parameter1>SlimMessageBus.Host.AzureServiceBus.Test - + diff --git a/src/SlimMessageBus.Host.Configuration/Settings/HasProviderExtensions.cs b/src/SlimMessageBus.Host.Configuration/Settings/HasProviderExtensions.cs index e961a7a1..8da226bc 100644 --- a/src/SlimMessageBus.Host.Configuration/Settings/HasProviderExtensions.cs +++ b/src/SlimMessageBus.Host.Configuration/Settings/HasProviderExtensions.cs @@ -18,6 +18,9 @@ public T GetOrCreate(string key, Func factoryMethod) return typedValue; } + public T GetOrCreate(ProviderExtensionProperty property, Func factoryMethod) + => GetOrCreate(property.Key, factoryMethod); + public T GetOrDefault(string key, T defaultValue = default) { if (Properties.TryGetValue(key, out var value)) @@ -27,6 +30,9 @@ public T GetOrDefault(string key, T defaultValue = default) return defaultValue; } + public T GetOrDefault(ProviderExtensionProperty property, T defaultValue = default) + => GetOrDefault(property.Key, defaultValue); + public T GetOrDefault(string key, MessageBusSettings messageBusSettings, T defaultValue = default) { if (Properties.TryGetValue(key, out var value) @@ -36,7 +42,10 @@ public T GetOrDefault(string key, MessageBusSettings messageBusSettings, T de return (T)value; } return defaultValue; - } + } + + public T GetOrDefault(ProviderExtensionProperty property, MessageBusSettings messageBusSettings, T defaultValue = default) + => GetOrDefault(property.Key, messageBusSettings, defaultValue); public T GetOrDefault(string key, HasProviderExtensions parentSettings, T defaultValue = default) { @@ -46,5 +55,8 @@ public T GetOrDefault(string key, HasProviderExtensions parentSettings, T def return (T)value; } return defaultValue; - } + } + + public T GetOrDefault(ProviderExtensionProperty property, HasProviderExtensions parentSettings, T defaultValue = default) + => GetOrDefault(property.Key, parentSettings, defaultValue); } diff --git a/src/SlimMessageBus.Host.Configuration/Settings/ProviderExtensionProperty.cs b/src/SlimMessageBus.Host.Configuration/Settings/ProviderExtensionProperty.cs new file mode 100644 index 00000000..c82a4326 --- /dev/null +++ b/src/SlimMessageBus.Host.Configuration/Settings/ProviderExtensionProperty.cs @@ -0,0 +1,9 @@ +namespace SlimMessageBus.Host; + +public class ProviderExtensionProperty(string key) +{ + public string Key { get; } = key; + + public void Set(HasProviderExtensions settings, T value) + => settings.Properties[Key] = value; +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumerForResponses.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumerForResponses.cs index 3b3cdfad..f4052905 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumerForResponses.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumerForResponses.cs @@ -8,7 +8,16 @@ namespace SlimMessageBus.Host.Kafka; /// public class KafkaPartitionConsumerForResponses : KafkaPartitionConsumer { - public KafkaPartitionConsumerForResponses(ILoggerFactory loggerFactory, RequestResponseSettings requestResponseSettings, string group, TopicPartition topicPartition, IKafkaCommitController commitController, IResponseConsumer responseConsumer, IMessageSerializer headerSerializer) + public KafkaPartitionConsumerForResponses( + ILoggerFactory loggerFactory, + RequestResponseSettings requestResponseSettings, + string group, + TopicPartition topicPartition, + IKafkaCommitController commitController, + MessageProvider messageProvider, + IPendingRequestStore pendingRequestStore, + ICurrentTimeProvider currentTimeProvider, + IMessageSerializer headerSerializer) : base( loggerFactory, [requestResponseSettings], @@ -19,8 +28,9 @@ public KafkaPartitionConsumerForResponses(ILoggerFactory loggerFactory, RequestR messageProcessor: new ResponseMessageProcessor( loggerFactory, requestResponseSettings, - responseConsumer, - messagePayloadProvider: m => m.Message.Value)) + messageProvider, + pendingRequestStore, + currentTimeProvider)) { } } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs index a50c977e..69ce65ea 100644 --- a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs +++ b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs @@ -67,7 +67,11 @@ void AddGroupConsumer(string group, IReadOnlyCollection topics, Func new KafkaPartitionConsumerForResponses(LoggerFactory, Settings.RequestResponse, Settings.RequestResponse.GetGroup(), tp, cc, this, HeaderSerializer); + object MessageProvider(Type messageType, ConsumeResult transportMessage) + => Serializer.Deserialize(messageType, transportMessage.Message.Value); + + IKafkaPartitionConsumer ResponseProcessorFactory(TopicPartition tp, IKafkaCommitController cc) + => new KafkaPartitionConsumerForResponses(LoggerFactory, Settings.RequestResponse, Settings.RequestResponse.GetGroup(), tp, cc, MessageProvider, PendingRequestStore, CurrentTimeProvider, HeaderSerializer); foreach (var consumersByGroup in Settings.Consumers.GroupBy(x => x.GetGroup())) { @@ -75,7 +79,8 @@ void AddGroupConsumer(string group, IReadOnlyCollection topics, Func x.Path).ToDictionary(x => x.Key, x => x.ToArray()); var topics = consumersByTopic.Keys.ToList(); - IKafkaPartitionConsumer ConsumerProcessorFactory(TopicPartition tp, IKafkaCommitController cc) => new KafkaPartitionConsumerForConsumers(LoggerFactory, consumersByTopic[tp.Topic], group, tp, cc, HeaderSerializer, this); + IKafkaPartitionConsumer ConsumerProcessorFactory(TopicPartition tp, IKafkaCommitController cc) + => new KafkaPartitionConsumerForConsumers(LoggerFactory, consumersByTopic[tp.Topic], group, tp, cc, HeaderSerializer, this); var processorFactory = ConsumerProcessorFactory; diff --git a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs index a876e219..24cc1cb8 100644 --- a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs +++ b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs @@ -77,8 +77,9 @@ void AddTopicConsumer(string topic, IMessageProcessor me var processor = new ResponseMessageProcessor( LoggerFactory, Settings.RequestResponse, - responseConsumer: this, - messagePayloadProvider: m => m.PayloadSegment.Array); + messageProvider: MessageProvider, + PendingRequestStore, + CurrentTimeProvider); AddTopicConsumer(Settings.RequestResponse.Path, processor); } diff --git a/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs b/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs index 79af07d8..3ea6254a 100644 --- a/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs +++ b/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs @@ -23,7 +23,7 @@ public NatsMessageBus(MessageBusSettings settings, NatsMessageBusSettings provid protected override void Build() { base.Build(); - AddInit(CreateConnectionAsync()); + InitTaskList.Add(CreateConnectionAsync, CancellationToken); } private Task CreateConnectionAsync() @@ -56,12 +56,14 @@ protected override async Task CreateConsumers() await base.CreateConsumers(); + object MessageProvider(Type messageType, NatsMsg transportMessage) => Serializer.Deserialize(messageType, transportMessage.Data); + foreach (var (subject, consumerSettings) in Settings.Consumers.GroupBy(x => x.Path).ToDictionary(x => x.Key, x => x.ToList())) { var processor = new MessageProcessor>( consumerSettings, messageBus: this, - messageProvider: (type, message) => Serializer.Deserialize(type, message.Data), + messageProvider: MessageProvider, subject, this, consumerErrorHandlerOpenGenericType: typeof(INatsConsumerErrorHandler<>)); @@ -71,7 +73,7 @@ protected override async Task CreateConsumers() if (Settings.RequestResponse != null) { - var processor = new ResponseMessageProcessor>(LoggerFactory, Settings.RequestResponse, this, message => message.Data); + var processor = new ResponseMessageProcessor>(LoggerFactory, Settings.RequestResponse, MessageProvider, PendingRequestStore, CurrentTimeProvider); AddSubjectConsumer(Settings.RequestResponse.Path, processor); } } diff --git a/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs b/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs index 7698a37a..160cf0aa 100644 --- a/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs +++ b/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs @@ -270,7 +270,7 @@ async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxM { var busName = busGroup.Key; var bus = GetBus(compositeMessageBus, messageBusTarget, busName); - if (bus == null || bus is not ITransportBulkProducer bulkProducer) + if (bus is not ITransportBulkProducer bulkProducer) { foreach (var outboxMessage in busGroup) { @@ -285,7 +285,6 @@ async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxM abortedIds.Add(outboxMessage.Id); } - continue; } diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs index 2040822c..52ec4eb4 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs @@ -11,7 +11,15 @@ public class RabbitMqConsumer : AbstractRabbitMqConsumer protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => _acknowledgementMode; - public RabbitMqConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, IList consumers, IMessageSerializer serializer, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter) + public RabbitMqConsumer( + ILoggerFactory loggerFactory, + IRabbitMqChannel channel, + string queueName, + IList consumers, + IMessageSerializer serializer, + MessageBusBase messageBus, + MessageProvider messageProvider, + IHeaderValueConverter headerValueConverter) : base(loggerFactory.CreateLogger(), channel, queueName, headerValueConverter) { _acknowledgementMode = consumers.Select(x => x.GetOrDefault(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null) @@ -21,7 +29,7 @@ public RabbitMqConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, messageBus, path: queueName, responseProducer: messageBus, - messageProvider: (messageType, m) => serializer.Deserialize(messageType, m.Body.ToArray()), + messageProvider: messageProvider, consumerContextInitializer: InitializeConsumerContext, consumerErrorHandlerOpenGenericType: typeof(IRabbitMqConsumerErrorHandler<>)); } diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs index a40bc40e..5a8366e4 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs @@ -6,10 +6,18 @@ public class RabbitMqResponseConsumer : AbstractRabbitMqConsumer protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; - public RabbitMqResponseConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, RequestResponseSettings requestResponseSettings, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter) + public RabbitMqResponseConsumer( + ILoggerFactory loggerFactory, + IRabbitMqChannel channel, + string queueName, + RequestResponseSettings requestResponseSettings, + MessageProvider messageProvider, + IPendingRequestStore pendingRequestStore, + ICurrentTimeProvider currentTimeProvider, + IHeaderValueConverter headerValueConverter) : base(loggerFactory.CreateLogger(), channel, queueName, headerValueConverter) { - _messageProcessor = new ResponseMessageProcessor(loggerFactory, requestResponseSettings, messageBus, m => m.Body.ToArray()); + _messageProcessor = new ResponseMessageProcessor(loggerFactory, requestResponseSettings, messageProvider, pendingRequestStore, currentTimeProvider); } protected override async Task OnMessageReceived(Dictionary messageHeaders, BasicDeliverEventArgs transportMessage) diff --git a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs index e8207191..278d408a 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs @@ -28,13 +28,15 @@ protected override void Build() { base.Build(); - AddInit(CreateConnection()); + InitTaskList.Add(CreateConnection, CancellationToken); } protected override async Task CreateConsumers() { await base.CreateConsumers(); + object MessageProvider(Type messageType, BasicDeliverEventArgs transportMessage) => Serializer.Deserialize(messageType, transportMessage.Body.ToArray()); + foreach (var (queueName, consumers) in Settings.Consumers.GroupBy(x => x.GetQueueName()).ToDictionary(x => x.Key, x => x.ToList())) { AddConsumer(new RabbitMqConsumer(LoggerFactory, @@ -43,6 +45,7 @@ protected override async Task CreateConsumers() consumers, Serializer, messageBus: this, + MessageProvider, ProviderSettings.HeaderValueConverter)); } @@ -52,7 +55,9 @@ protected override async Task CreateConsumers() channel: this, queueName: Settings.RequestResponse.GetQueueName(), Settings.RequestResponse, - this, + MessageProvider, + PendingRequestStore, + CurrentTimeProvider, ProviderSettings.HeaderValueConverter)); } } diff --git a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs index 29ba4efd..eef314c3 100644 --- a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs +++ b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs @@ -141,11 +141,11 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor(LoggerFactory, Settings.RequestResponse, this, messagePayloadProvider: m => m.Payload)); + AddTopicConsumer(Settings.RequestResponse.Path, subscriber, new ResponseMessageProcessor(LoggerFactory, Settings.RequestResponse, MessageProvider, PendingRequestStore, CurrentTimeProvider)); } else { - queues.Add((Settings.RequestResponse.Path, new ResponseMessageProcessor(LoggerFactory, Settings.RequestResponse, this, messagePayloadProvider: m => m.Payload))); + queues.Add((Settings.RequestResponse.Path, new ResponseMessageProcessor(LoggerFactory, Settings.RequestResponse, MessageProvider, PendingRequestStore, CurrentTimeProvider))); } } diff --git a/src/SlimMessageBus.Host.Serialization.Json/JsonMessageSerializer.cs b/src/SlimMessageBus.Host.Serialization.Json/JsonMessageSerializer.cs index 2aa0356e..c3998f70 100644 --- a/src/SlimMessageBus.Host.Serialization.Json/JsonMessageSerializer.cs +++ b/src/SlimMessageBus.Host.Serialization.Json/JsonMessageSerializer.cs @@ -7,7 +7,7 @@ namespace SlimMessageBus.Host.Serialization.Json; using Newtonsoft.Json; -public class JsonMessageSerializer : IMessageSerializer +public class JsonMessageSerializer : IMessageSerializer, IMessageSerializer { private readonly ILogger _logger; private readonly Encoding _encoding; @@ -31,9 +31,7 @@ public byte[] Serialize(Type t, object message) { var jsonPayload = JsonConvert.SerializeObject(message, t, _serializerSettings); _logger.LogDebug("Type {MessageType} serialized from {Message} to JSON {MessageJson}", t, message, jsonPayload); - - var payload = _encoding.GetBytes(jsonPayload); - return payload; + return _encoding.GetBytes(jsonPayload); } public object Deserialize(Type t, byte[] payload) @@ -42,16 +40,32 @@ public object Deserialize(Type t, byte[] payload) try { jsonPayload = _encoding.GetString(payload); - var message = JsonConvert.DeserializeObject(jsonPayload, t, _serializerSettings); - _logger.LogDebug("Type {MessageType} deserialized from JSON {MessageJson} to {Message}", t, jsonPayload, message); - return message; + return Deserialize(t, jsonPayload); } catch (Exception e) { _logger.LogError(e, "Type {MessageType} could not been deserialized, payload: {MessagePayload}, JSON: {MessageJson}", t, _logger.IsEnabled(LogLevel.Debug) ? Convert.ToBase64String(payload) : "(...)", jsonPayload); throw; } - } - + } + + #endregion + + #region Implementation of IMessageSerializer + + string IMessageSerializer.Serialize(Type t, object message) + { + var payload = JsonConvert.SerializeObject(message, t, _serializerSettings); + _logger.LogDebug("Type {MessageType} serialized from {Message} to JSON {MessageJson}", t, message, payload); + return payload; + } + + public object Deserialize(Type t, string payload) + { + var message = JsonConvert.DeserializeObject(payload, t, _serializerSettings); + _logger.LogDebug("Type {MessageType} deserialized from JSON {MessageJson} to {Message}", t, payload, message); + return message; + } + #endregion } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Serialization.Json/SerializationBuilderExtensions.cs b/src/SlimMessageBus.Host.Serialization.Json/SerializationBuilderExtensions.cs index 4f6c0bad..83502385 100644 --- a/src/SlimMessageBus.Host.Serialization.Json/SerializationBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.Serialization.Json/SerializationBuilderExtensions.cs @@ -24,7 +24,10 @@ public static TBuilder AddJsonSerializer(this TBuilder builder, Encodi { builder.RegisterSerializer(services => { + // Add the implementation services.TryAddSingleton(svp => new JsonMessageSerializer(jsonSerializerSettings ?? svp.GetService(), encoding, svp.GetRequiredService>())); + // Add the serializer as IMessageSerializer + services.TryAddSingleton(svp => svp.GetRequiredService() as IMessageSerializer); }); return builder; } diff --git a/src/SlimMessageBus.Host.Serialization.SystemTextJson/JsonMessageSerializer.cs b/src/SlimMessageBus.Host.Serialization.SystemTextJson/JsonMessageSerializer.cs index e929909f..fab9cff9 100644 --- a/src/SlimMessageBus.Host.Serialization.SystemTextJson/JsonMessageSerializer.cs +++ b/src/SlimMessageBus.Host.Serialization.SystemTextJson/JsonMessageSerializer.cs @@ -6,7 +6,7 @@ /// /// Implementation of using . /// -public class JsonMessageSerializer : IMessageSerializer +public class JsonMessageSerializer : IMessageSerializer, IMessageSerializer { /// /// options for the JSON serializer. By default adds converter. @@ -30,9 +30,23 @@ public virtual JsonSerializerOptions CreateDefaultOptions() return options; } + #region Implementation of IMessageSerializer + public byte[] Serialize(Type t, object message) => JsonSerializer.SerializeToUtf8Bytes(message, t, Options); public object Deserialize(Type t, byte[] payload) => - JsonSerializer.Deserialize(payload, t, Options)!; + JsonSerializer.Deserialize(payload, t, Options)!; + + #endregion + + #region Implementation of IMessageSerializer + + string IMessageSerializer.Serialize(Type t, object message) + => JsonSerializer.Serialize(message, t, Options); + + object IMessageSerializer.Deserialize(Type t, string payload) + => JsonSerializer.Deserialize(payload, t, Options)!; + + #endregion } diff --git a/src/SlimMessageBus.Host.Serialization.SystemTextJson/SerializationBuilderExtensions.cs b/src/SlimMessageBus.Host.Serialization.SystemTextJson/SerializationBuilderExtensions.cs index e9af7d27..e8725b31 100644 --- a/src/SlimMessageBus.Host.Serialization.SystemTextJson/SerializationBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.Serialization.SystemTextJson/SerializationBuilderExtensions.cs @@ -19,7 +19,10 @@ public static TBuilder AddJsonSerializer(this TBuilder builder, JsonSe { builder.RegisterSerializer(services => { + // Add the implementation services.TryAddSingleton(svp => new JsonMessageSerializer(options ?? svp.GetService())); + // Add the serializer as IMessageSerializer + services.TryAddSingleton(svp => svp.GetRequiredService() as IMessageSerializer); }); return builder; } diff --git a/src/SlimMessageBus.Host.Serialization/IMessageSerializer.cs b/src/SlimMessageBus.Host.Serialization/IMessageSerializer.cs index 191719f4..060ee3c5 100644 --- a/src/SlimMessageBus.Host.Serialization/IMessageSerializer.cs +++ b/src/SlimMessageBus.Host.Serialization/IMessageSerializer.cs @@ -1,7 +1,18 @@ namespace SlimMessageBus.Host.Serialization; -public interface IMessageSerializer +/// +/// Serializer for messages into byte[]. +/// +public interface IMessageSerializer : IMessageSerializer { - byte[] Serialize(Type t, object message); - object Deserialize(Type t, byte[] payload); +} + +/// +/// Serializer for messages into the given payload type (byte[] etc). +/// +/// +public interface IMessageSerializer +{ + TPayload Serialize(Type t, object message); + object Deserialize(Type t, TPayload payload); } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Sql/SqlMessageBus.cs b/src/SlimMessageBus.Host.Sql/SqlMessageBus.cs index 305bd7a8..ecb5a577 100644 --- a/src/SlimMessageBus.Host.Sql/SqlMessageBus.cs +++ b/src/SlimMessageBus.Host.Sql/SqlMessageBus.cs @@ -11,7 +11,7 @@ protected override void Build() { base.Build(); - AddInit(ProvisionTopology()); + InitTaskList.Add(ProvisionTopology, CancellationToken); } public override async Task ProvisionTopology() diff --git a/src/SlimMessageBus.Host/Collections/AsyncTaskList.cs b/src/SlimMessageBus.Host/Collections/AsyncTaskList.cs new file mode 100644 index 00000000..41b70d8a --- /dev/null +++ b/src/SlimMessageBus.Host/Collections/AsyncTaskList.cs @@ -0,0 +1,58 @@ +namespace SlimMessageBus.Host.Collections; + +public interface IAsyncTaskList +{ + void Add(Func taskFactory, CancellationToken cancellationToken); + Task EnsureAllFinished(); +} + +/// +/// Tracks a list of Tasks that have to be awaited before we can proceed. +/// +public class AsyncTaskList : IAsyncTaskList +{ + private readonly object _currentTaskLock = new(); + private Task _currentTask = null; + + public void Add(Func taskFactory, CancellationToken cancellationToken) + { + static async Task AddNext(Task prevTask, Func taskFactory) + { + await prevTask; + await taskFactory(); + } + + lock (_currentTaskLock) + { + var prevTask = _currentTask; + _currentTask = prevTask != null + ? AddNext(prevTask, taskFactory) + : taskFactory(); + } + } + + + /// + /// Awaits (if any) bus intialization tasks (e.g. topology provisining) before we can produce message into the bus (or consume messages). + /// + /// + public async Task EnsureAllFinished() + { + var initTask = _currentTask; + if (initTask != null) + { + await initTask.ConfigureAwait(false); + + lock (_currentTaskLock) + { + // Clear if await finished and the current task chain was the one we awaited + if (ReferenceEquals(_currentTask, initTask)) + { + _currentTask = null; + } + } + } + } + + +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/Delegates.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/Delegates.cs index dfe47783..c6975bb3 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/Delegates.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/Delegates.cs @@ -4,9 +4,10 @@ /// Provides the message payload (binary) from the transport message. /// /// +/// /// /// -public delegate byte[] MessagePayloadProvider(T transportMessage); +public delegate TPayload MessagePayloadProvider(T transportMessage); /// /// Initializes the consumer context from the specified transport message. diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/IResponseConsumer.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/IResponseConsumer.cs deleted file mode 100644 index eace101f..00000000 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/IResponseConsumer.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace SlimMessageBus.Host; - -public interface IResponseConsumer -{ - Task OnResponseArrived(byte[] responsePayload, string path, IReadOnlyDictionary responseHeaders); -} \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs index e5f8fb5d..10242fc2 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs @@ -1,44 +1,121 @@ namespace SlimMessageBus.Host; +public abstract class ResponseMessageProcessor +{ +} + /// /// The implementation that processes the responses arriving to the bus. /// -/// -public class ResponseMessageProcessor : IMessageProcessor +/// +public class ResponseMessageProcessor : ResponseMessageProcessor, IMessageProcessor { - private readonly ILogger> _logger; + private readonly ILogger _logger; private readonly RequestResponseSettings _requestResponseSettings; - private readonly IResponseConsumer _responseConsumer; private readonly IReadOnlyCollection _consumerSettings; - private readonly MessagePayloadProvider _messagePayloadProvider; + private readonly MessageProvider _messageProvider; + private readonly IPendingRequestStore _pendingRequestStore; + private readonly ICurrentTimeProvider _currentTimeProvider; - public ResponseMessageProcessor(ILoggerFactory loggerFactory, RequestResponseSettings requestResponseSettings, IResponseConsumer responseConsumer, MessagePayloadProvider messagePayloadProvider) + public ResponseMessageProcessor(ILoggerFactory loggerFactory, + RequestResponseSettings requestResponseSettings, + MessageProvider messageProvider, + IPendingRequestStore pendingRequestStore, + ICurrentTimeProvider currentTimeProvider) { if (loggerFactory is null) throw new ArgumentNullException(nameof(loggerFactory)); - _logger = loggerFactory.CreateLogger>(); + _logger = loggerFactory.CreateLogger(); _requestResponseSettings = requestResponseSettings ?? throw new ArgumentNullException(nameof(requestResponseSettings)); - _responseConsumer = responseConsumer ?? throw new ArgumentNullException(nameof(responseConsumer)); - _consumerSettings = new List { _requestResponseSettings }; - _messagePayloadProvider = messagePayloadProvider ?? throw new ArgumentNullException(nameof(messagePayloadProvider)); + _consumerSettings = [_requestResponseSettings]; + _messageProvider = messageProvider ?? throw new ArgumentNullException(nameof(messageProvider)); + _pendingRequestStore = pendingRequestStore; + _currentTimeProvider = currentTimeProvider; } public IReadOnlyCollection ConsumerSettings => _consumerSettings; - public async Task ProcessMessage(TMessage transportMessage, IReadOnlyDictionary messageHeaders, IDictionary consumerContextProperties = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default) + public Task ProcessMessage(TTransportMessage transportMessage, IReadOnlyDictionary messageHeaders, IDictionary consumerContextProperties = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default) { + Exception ex; try { - var messagePayload = _messagePayloadProvider(transportMessage); - var exception = await _responseConsumer.OnResponseArrived(messagePayload, _requestResponseSettings.Path, messageHeaders); - return new(exception, _requestResponseSettings, null); + ex = OnResponseArrived(transportMessage, _requestResponseSettings.Path, messageHeaders); } catch (Exception e) { _logger.LogError(e, "Error occurred while consuming response message, {Message}", transportMessage); - // We can only continue and process all messages in the lease - return new(e, _requestResponseSettings, null); + ex = e; } + return Task.FromResult(new ProcessMessageResult(ex, _requestResponseSettings, null)); } + + /// + /// Should be invoked by the concrete bus implementation whenever there is a message arrived on the reply to topic. + /// + /// The response message + /// + /// The response message headers + /// + private Exception OnResponseArrived(TTransportMessage transportMessage, string path, IReadOnlyDictionary responseHeaders) + { + if (!responseHeaders.TryGetHeader(ReqRespMessageHeaders.RequestId, out string requestId)) + { + return new ConsumerMessageBusException($"The response message arriving on path {path} did not have the {ReqRespMessageHeaders.RequestId} header. Unable to math the response with the request. This likely indicates a misconfiguration."); + } + + var requestState = _pendingRequestStore.GetById(requestId); + if (requestState == null) + { + _logger.LogDebug("The response message for request id {RequestId} arriving on path {Path} will be disregarded. Either the request had already expired, had been cancelled or it was already handled (this response message is a duplicate).", requestId, path); + // ToDo: add and API hook to these kind of situation + return null; + } + + try + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + var tookTimespan = _currentTimeProvider.CurrentTime.Subtract(requestState.Created); + _logger.LogDebug("Response arrived for {Request} on path {Path} (time: {RequestTime} ms)", requestState, path, tookTimespan); + } + + if (responseHeaders.TryGetHeader(ReqRespMessageHeaders.Error, out string errorMessage)) + { + // error response arrived + + var responseException = new RequestHandlerFaultedMessageBusException(errorMessage); + _logger.LogDebug(responseException, "Response arrived for {Request} on path {Path} with error: {ResponseError}", requestState, path, responseException.Message); + requestState.TaskCompletionSource.TrySetException(responseException); + } + else + { + // response arrived + try + { + // deserialize the response message + var response = transportMessage != null + ? _messageProvider(requestState.ResponseType, transportMessage) + : null; + + // resolve the response + requestState.TaskCompletionSource.TrySetResult(response); + } + catch (Exception e) + { + _logger.LogDebug(e, "Could not deserialize the response message for {Request} arriving on path {Path}", requestState, path); + requestState.TaskCompletionSource.TrySetException(e); + } + } + } + finally + { + // remove the request from the queue + _pendingRequestStore.Remove(requestId); + } + + return null; + } + } diff --git a/src/SlimMessageBus.Host/Helpers/CompatMethods.cs b/src/SlimMessageBus.Host/Helpers/CompatMethods.cs index 3407478f..145d8865 100644 --- a/src/SlimMessageBus.Host/Helpers/CompatMethods.cs +++ b/src/SlimMessageBus.Host/Helpers/CompatMethods.cs @@ -23,7 +23,34 @@ public static bool TryAdd(this IDictionary dict, K key, V value) return false; } - public static HashSet ToHashSet(this IEnumerable items) => new HashSet(items); + public static HashSet ToHashSet(this IEnumerable items) => new(items); + +#if NETSTANDARD2_0 + + public static IEnumerable> Chunk(this IEnumerable items, int size) + { + var chunk = new List(size); + + foreach (var item in items) + { + if (chunk.Count < size) + { + chunk.Add(item); + } + else + { + yield return chunk; + chunk = new List(size); + } + } + + if (chunk.Count > 0) + { + yield return chunk; + } + } + +#endif } public static class TimeSpanExtensions diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs index d023c2ff..e5e99498 100644 --- a/src/SlimMessageBus.Host/MessageBusBase.cs +++ b/src/SlimMessageBus.Host/MessageBusBase.cs @@ -5,23 +5,18 @@ namespace SlimMessageBus.Host; using SlimMessageBus.Host.Consumer; using SlimMessageBus.Host.Services; - -public abstract class MessageBusBase : MessageBusBase where TProviderSettings : class + +public abstract class MessageBusBase(MessageBusSettings settings, TProviderSettings providerSettings) : MessageBusBase(settings) + where TProviderSettings : class { - public TProviderSettings ProviderSettings { get; } - - protected MessageBusBase(MessageBusSettings settings, TProviderSettings providerSettings) : base(settings) - { - ProviderSettings = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings)); - } -} + public TProviderSettings ProviderSettings { get; } = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings)); +} public abstract class MessageBusBase : IDisposable, IAsyncDisposable, IMasterMessageBus, IMessageScopeFactory, IMessageHeadersFactory, IResponseProducer, - IResponseConsumer, ITransportProducer, ITransportBulkProducer { @@ -30,6 +25,7 @@ public abstract class MessageBusBase : IDisposable, IAsyncDisposable, private IMessageSerializer _serializer; private readonly MessageHeaderService _headerService; private readonly List _consumers = []; + public ILoggerFactory LoggerFactory { get; protected set; } /// /// Special market reference that signifies a dummy producer settings for response types. @@ -38,8 +34,6 @@ public abstract class MessageBusBase : IDisposable, IAsyncDisposable, public RuntimeTypeCache RuntimeTypeCache { get; } - public ILoggerFactory LoggerFactory { get; } - public virtual MessageBusSettings Settings { get; } public virtual IMessageSerializer Serializer => _serializer ??= GetSerializer(); @@ -64,9 +58,14 @@ public abstract class MessageBusBase : IDisposable, IAsyncDisposable, protected bool IsDisposed { get; private set; } #endregion - - private readonly object _initTaskLock = new(); - private Task _initTask = null; + + /// + /// Maintains a list of tasks that should be completed before the bus can produce the first message or start consumers. + /// Add async things like + /// - connection creations here to the underlying transport client + /// - provision topology + /// + protected readonly AsyncTaskList InitTaskList = new(); #region Start & Stop @@ -110,36 +109,6 @@ protected MessageBusBase(MessageBusSettings settings) PendingRequestStore = PendingRequestManager.Store; } - protected void AddInit(Task task) - { - lock (_initTaskLock) - { - var prevInitTask = _initTask; - _initTask = prevInitTask?.ContinueWith(_ => task, CancellationToken) ?? task; - } - } - - /// - /// Awaits (if any) bus intialization (e.g. topology provisining) before we can produce message into the bus. - /// - /// - protected async Task EnsureInitFinished() - { - var initTask = _initTask; - if (initTask != null) - { - await initTask.ConfigureAwait(false); - - lock (_initTaskLock) - { - if (ReferenceEquals(_initTask, initTask)) - { - _initTask = null; - } - } - } - } - protected virtual IMessageSerializer GetSerializer() => Settings.GetSerializer(Settings.ServiceProvider); protected virtual IMessageBusSettingsValidationService ValidationService { get => new DefaultMessageBusSettingsValidationService(Settings); } @@ -154,7 +123,7 @@ protected void OnBuildProvider() Build(); // Notify the bus has been created - before any message can be produced - AddInit(OnBusLifecycle(MessageBusLifecycleEventType.Created)); + InitTaskList.Add(() => OnBusLifecycle(MessageBusLifecycleEventType.Created), CancellationToken); // Auto start consumers if enabled if (Settings.AutoStartConsumers) @@ -222,7 +191,7 @@ public async Task Start() try { - await EnsureInitFinished(); + await InitTaskList.EnsureAllFinished(); _logger.LogInformation("Starting consumers for {BusName} bus...", Name); await OnBusLifecycle(MessageBusLifecycleEventType.Starting).ConfigureAwait(false); @@ -261,7 +230,7 @@ public async Task Stop() try { - await EnsureInitFinished(); + await InitTaskList.EnsureAllFinished(); _logger.LogInformation("Stopping consumers for {BusName} bus...", Name); await OnBusLifecycle(MessageBusLifecycleEventType.Stopping).ConfigureAwait(false); @@ -450,7 +419,7 @@ public async virtual Task ProducePublish(object message, string path = null, IDi { if (message == null) throw new ArgumentNullException(nameof(message)); AssertActive(); - await EnsureInitFinished(); + await InitTaskList.EnsureAllFinished(); // check if the cancellation was already requested cancellationToken.ThrowIfCancellationRequested(); @@ -548,8 +517,8 @@ public virtual async Task ProduceSend(object request, stri { if (request == null) throw new ArgumentNullException(nameof(request)); AssertActive(); - AssertRequestResponseConfigured(); - await EnsureInitFinished(); + AssertRequestResponseConfigured(); + await InitTaskList.EnsureAllFinished(); // check if the cancellation was already requested cancellationToken.ThrowIfCancellationRequested(); @@ -669,78 +638,6 @@ public virtual Task ProduceResponse(string requestId, object request, IReadOnlyD return ProduceToTransport(response, responseType, (string)replyTo, responseHeaders, null, cancellationToken); } - /// - /// Should be invoked by the concrete bus implementation whenever there is a message arrived on the reply to topic. - /// - /// - /// - /// - public virtual Task OnResponseArrived(byte[] responsePayload, string path, IReadOnlyDictionary responseHeaders) - { - if (!responseHeaders.TryGetHeader(ReqRespMessageHeaders.RequestId, out string requestId)) - { - _logger.LogError("The response message arriving on path {Path} did not have the {HeaderName} header. Unable to math the response with the request. This likely indicates a misconfiguration.", path, ReqRespMessageHeaders.RequestId); - return Task.FromResult(null); - } - - Exception responseException = null; - if (responseHeaders.TryGetHeader(ReqRespMessageHeaders.Error, out string errorMessage)) - { - responseException = new RequestHandlerFaultedMessageBusException(errorMessage); - } - - var requestState = PendingRequestStore.GetById(requestId); - if (requestState == null) - { - _logger.LogDebug("The response message for request id {RequestId} arriving on path {Path} will be disregarded. Either the request had already expired, had been cancelled or it was already handled (this response message is a duplicate).", requestId, path); - - // ToDo: add and API hook to these kind of situation - return Task.FromResult(null); - } - - try - { - if (_logger.IsEnabled(LogLevel.Debug)) - { - var tookTimespan = CurrentTimeProvider.CurrentTime.Subtract(requestState.Created); - _logger.LogDebug("Response arrived for {Request} on path {Path} (time: {RequestTime} ms)", requestState, path, tookTimespan); - } - - if (responseException != null) - { - // error response arrived - _logger.LogDebug(responseException, "Response arrived for {Request} on path {Path} with error: {ResponseError}", requestState, path, responseException.Message); - - requestState.TaskCompletionSource.TrySetException(responseException); - } - else - { - // response arrived - try - { - // deserialize the response message - var response = responsePayload != null - ? Serializer.Deserialize(requestState.ResponseType, responsePayload) - : null; - - // resolve the response - requestState.TaskCompletionSource.TrySetResult(response); - } - catch (Exception e) - { - _logger.LogDebug(e, "Could not deserialize the response message for {Request} arriving on path {Path}", requestState, path); - requestState.TaskCompletionSource.TrySetException(e); - } - } - } - finally - { - // remove the request from the queue - PendingRequestStore.Remove(requestId); - } - return Task.FromResult(null); - } - /// /// Generates unique request IDs /// diff --git a/src/SlimMessageBus.Host/RequestResponse/PendingRequestManager.cs b/src/SlimMessageBus.Host/RequestResponse/PendingRequestManager.cs index f6ee7774..a4731927 100644 --- a/src/SlimMessageBus.Host/RequestResponse/PendingRequestManager.cs +++ b/src/SlimMessageBus.Host/RequestResponse/PendingRequestManager.cs @@ -7,7 +7,6 @@ public class PendingRequestManager : IPendingRequestManager, IDisposable { private readonly ILogger _logger; - private readonly TimeSpan _timerInterval; private readonly Timer _timer; private readonly object _timerSync = new(); @@ -23,9 +22,10 @@ public PendingRequestManager(IPendingRequestStore store, ICurrentTimeProvider ti Store = store; _onRequestTimeout = onRequestTimeout; - _timeProvider = timeProvider; - _timerInterval = interval ?? TimeSpan.FromSeconds(3); - _timer = new Timer(state => TimerCallback(), null, _timerInterval, _timerInterval); + _timeProvider = timeProvider; + + var timerInterval = interval ?? TimeSpan.FromSeconds(3); + _timer = new Timer(state => TimerCallback(), null, timerInterval, timerInterval); } #region IDisposable diff --git a/src/SlimMessageBus.sln b/src/SlimMessageBus.sln index 3cb22665..f96a2a9c 100644 --- a/src/SlimMessageBus.sln +++ b/src/SlimMessageBus.sln @@ -139,6 +139,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docs", "Docs", "{CBE53E71-7 ..\CONTRIBUTING.md = ..\CONTRIBUTING.md ..\docs\intro.md = ..\docs\intro.md ..\docs\NuGet.md = ..\docs\NuGet.md + ..\docs\provider_amazon_sqs.md = ..\docs\provider_amazon_sqs.md ..\docs\provider_azure_eventhubs.md = ..\docs\provider_azure_eventhubs.md ..\docs\provider_azure_servicebus.md = ..\docs\provider_azure_servicebus.md ..\docs\provider_hybrid.md = ..\docs\provider_hybrid.md @@ -278,6 +279,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.AspNetC EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Artifacts", "Artifacts", "{0F4AD1B7-157D-4ABC-A379-68BF207F2FC3}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.AmazonSQS", "SlimMessageBus.Host.AmazonSQS\SlimMessageBus.Host.AmazonSQS.csproj", "{4DF4BC7C-5EE3-4310-BC40-054C1494444E}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.AmazonSQS.Test", "Tests\SlimMessageBus.Host.AmazonSQS.Test\SlimMessageBus.Host.AmazonSQS.Test.csproj", "{9255A33D-9697-4E69-9418-AD31656FF8AC}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -846,6 +851,22 @@ Global {9FCBF788-1F0C-43E2-909D-1F96B2685F38}.Release|Any CPU.Build.0 = Release|Any CPU {9FCBF788-1F0C-43E2-909D-1F96B2685F38}.Release|x86.ActiveCfg = Release|Any CPU {9FCBF788-1F0C-43E2-909D-1F96B2685F38}.Release|x86.Build.0 = Release|Any CPU + {4DF4BC7C-5EE3-4310-BC40-054C1494444E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4DF4BC7C-5EE3-4310-BC40-054C1494444E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4DF4BC7C-5EE3-4310-BC40-054C1494444E}.Debug|x86.ActiveCfg = Debug|Any CPU + {4DF4BC7C-5EE3-4310-BC40-054C1494444E}.Debug|x86.Build.0 = Debug|Any CPU + {4DF4BC7C-5EE3-4310-BC40-054C1494444E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4DF4BC7C-5EE3-4310-BC40-054C1494444E}.Release|Any CPU.Build.0 = Release|Any CPU + {4DF4BC7C-5EE3-4310-BC40-054C1494444E}.Release|x86.ActiveCfg = Release|Any CPU + {4DF4BC7C-5EE3-4310-BC40-054C1494444E}.Release|x86.Build.0 = Release|Any CPU + {9255A33D-9697-4E69-9418-AD31656FF8AC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9255A33D-9697-4E69-9418-AD31656FF8AC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9255A33D-9697-4E69-9418-AD31656FF8AC}.Debug|x86.ActiveCfg = Debug|Any CPU + {9255A33D-9697-4E69-9418-AD31656FF8AC}.Debug|x86.Build.0 = Debug|Any CPU + {9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|Any CPU.Build.0 = Release|Any CPU + {9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|x86.ActiveCfg = Release|Any CPU + {9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -932,6 +953,8 @@ Global {46C40625-D1AC-4EA1-9562-4F1837D417CE} = {A5B15524-93B8-4CCE-AC6D-A22984498BA0} {5250E48D-36C7-4214-8D7E-5924A9E337C6} = {59F88FB5-6D19-4520-87E8-227B3539BBB3} {9FCBF788-1F0C-43E2-909D-1F96B2685F38} = {9F005B5C-A856-4351-8C0C-47A8B785C637} + {4DF4BC7C-5EE3-4310-BC40-054C1494444E} = {9291D340-B4FA-44A3-8060-C14743FB1712} + {9255A33D-9697-4E69-9418-AD31656FF8AC} = {9F005B5C-A856-4351-8C0C-47A8B785C637} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {435A0D65-610C-4B84-B1AA-2C7FBE72DB80} diff --git a/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/GlobalUsings.cs b/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/GlobalUsings.cs new file mode 100644 index 00000000..b9eb4462 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/GlobalUsings.cs @@ -0,0 +1,12 @@ +global using FluentAssertions; + +global using Microsoft.Extensions.Configuration; +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Logging; + +global using SecretStore; + +global using SlimMessageBus.Host.Test.Common.IntegrationTest; + +global using Xunit; +global using Xunit.Abstractions; diff --git a/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/Headers/DefaultSqsHeaderSerializerTest.cs b/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/Headers/DefaultSqsHeaderSerializerTest.cs new file mode 100644 index 00000000..daf379a3 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/Headers/DefaultSqsHeaderSerializerTest.cs @@ -0,0 +1,32 @@ +namespace SlimMessageBus.Host.AmazonSQS.Test; + +public class DefaultSqsHeaderSerializerTest +{ + public static readonly TheoryData Data = new() + { + { null, null }, + { 10, 10 }, + { false, false }, + { true, true }, + { "string", "string" }, + { DateTime.Now.Date, DateTime.Now.Date }, + { Guid.Parse("{529194F3-AEAA-497D-A495-C84DD67C2DDA}"), Guid.Parse("{529194F3-AEAA-497D-A495-C84DD67C2DDA}") }, + { Guid.Empty, Guid.Empty }, + }; + + [Theory] + [MemberData(nameof(Data))] + public void When_Serialize_Given_VariousValueTypes_Then_RestoresTheValue(object value, object expectedValue) + { + // arrange + var serializer = new DefaultSqsHeaderSerializer(); + var key = "key"; + + // act + var result = serializer.Serialize(key, value); + var resultValue = serializer.Deserialize(key, result); + + // assert + resultValue.Should().Be(expectedValue); + } +} diff --git a/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SlimMessageBus.Host.AmazonSQS.Test.csproj b/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SlimMessageBus.Host.AmazonSQS.Test.csproj new file mode 100644 index 00000000..87b829ac --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SlimMessageBus.Host.AmazonSQS.Test.csproj @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + Always + + + PreserveNewest + + + + diff --git a/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsMessageBusIt.cs new file mode 100644 index 00000000..090b79f0 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsMessageBusIt.cs @@ -0,0 +1,327 @@ +namespace SlimMessageBus.Host.AmazonSQS.Test; + +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Runtime.CompilerServices; + +using Amazon.SQS.Model; + +using Microsoft.Extensions.Logging; + +using SlimMessageBus.Host.Serialization.SystemTextJson; + +/// +/// Runs the integration tests for the . +/// Notice that this test needs to run against a real Amazon SQS infrastructure. +/// Inside the GitHub Actions pipeline, the Amazon SQS infrastructure is shared, and this tests attempts to isolate itself by using unique queue names. +/// +[Trait("Category", "Integration")] +[Trait("Transport", "AmazonSQS")] +public class SqsMessageBusIt(ITestOutputHelper output) : BaseIntegrationTest(output) +{ + private const int NumberOfMessages = 100; + private const string QueueNamePrefix = "SMB"; + private const string CreatedDateTag = "CreatedDate"; + + protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration) + { + var today = DateTime.UtcNow.Date.ToString("o"); + + services.AddSingleton>(); + + services.AddSlimMessageBus((mbb) => + { + mbb.AddServicesFromAssemblyContaining(); + mbb.AddJsonSerializer(); + ApplyBusConfiguration(mbb); + }); + + void AdditionalSqsSetup(SqsMessageBusSettings cfg) + { + cfg.TopologyProvisioning.CreateQueueOptions = opts => + { + // Tag the queue with the creation date + opts.Tags.Add(CreatedDateTag, today); + }; + cfg.TopologyProvisioning.OnProvisionTopology = async (client, provision) => + { + // Remove all older test queues (SQS does not support queue auto deletion) + var r = await client.ListQueuesAsync(QueueNamePrefix); + foreach (var queueUrl in r.QueueUrls) + { + var tagsResponse = await client.ListQueueTagsAsync(new ListQueueTagsRequest { QueueUrl = queueUrl }); + if (!tagsResponse.Tags.TryGetValue(CreatedDateTag, out var createdDateTag) || createdDateTag != today) + { + await client.DeleteQueueAsync(queueUrl); + } + } + await provision(); + }; + } + + var accessKey = Secrets.Service.PopulateSecrets(configuration["Amazon:AccessKey"]); + var secretAccessKey = Secrets.Service.PopulateSecrets(configuration["Amazon:SecretAccessKey"]); + + var roleArn = Secrets.Service.PopulateSecrets(configuration["Amazon:RoleArn"]); + var roleSessionName = Secrets.Service.PopulateSecrets(configuration["Amazon:RoleSessionName"]); + + // doc:fragment:ExampleSetup + services.AddSlimMessageBus((mbb) => + { + mbb.WithProviderAmazonSQS(cfg => + { + cfg.UseRegion(Amazon.RegionEndpoint.EUCentral1); + + // Use static credentials: https://docs.aws.amazon.com/sdkref/latest/guide/access-iam-users.html + cfg.UseCredentials(accessKey, secretAccessKey); + + // Use temporary credentials: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html#RequestWithSTS + //cfg.UseTemporaryCredentials(roleArn, roleSessionName); + + AdditionalSqsSetup(cfg); + }); + }); + // doc:fragment:ExampleSetup + } + + public IMessageBus MessageBus => ServiceProvider.GetRequiredService(); + + [Theory] + [InlineData(false, true)] + [InlineData(true, true)] + [InlineData(false, false)] + [InlineData(true, false)] + public async Task BasicQueue(bool fifo, bool bulkProduce) + { + var queue = string.Concat(QueueName(), fifo ? ".fifo" : string.Empty); + AddBusConfiguration(mbb => + { + mbb + .Produce(x => + { + x.DefaultQueue(queue); + if (fifo) + { + x.EnableFifo(f => f + .DeduplicationId((m, h) => (m.Counter + 1000).ToString()) + .GroupId((m, h) => m.Counter % 2 == 0 ? "even" : "odd") + ); + } + }) + .Consume(x => x + .Queue(queue) + .WithConsumer() + .WithConsumer() + .Instances(20)); + }); + + await BasicProducerConsumer(1, bulkProduce: bulkProduce); + } + + public class TestData + { + public List ProducedMessages { get; set; } + public IReadOnlyCollection ConsumedMessages { get; set; } + } + + private async Task BasicProducerConsumer(int expectedMessageCopies, Action additionalAssertion = null, bool bulkProduce = false) + { + // arrange + var testMetric = ServiceProvider.GetRequiredService(); + var consumedMessages = ServiceProvider.GetRequiredService>(); + + var messageBus = MessageBus; + + // act + + // publish + var stopwatch = Stopwatch.StartNew(); + + var producedMessages = Enumerable + .Range(0, NumberOfMessages) + .Select(i => i % 2 == 0 ? new PingMessage(i) : new PingDerivedMessage(i)) + .ToList(); + + if (bulkProduce) + { + await messageBus.Publish(producedMessages); + } + else + { + foreach (var producedMessage in producedMessages) + { + // Send them in order + await messageBus.Publish(producedMessage); + } + } + + stopwatch.Stop(); + Logger.LogInformation("Published {Count} messages in {Elapsed}", producedMessages.Count, stopwatch.Elapsed); + + // consume + stopwatch.Restart(); + + await consumedMessages.WaitUntilArriving(newMessagesTimeout: 5); + + stopwatch.Stop(); + + // assert + + // ensure number of instances of consumers created matches + var expectedConsumedCount = producedMessages.Count + producedMessages.OfType().Count(); + testMetric.CreatedConsumerCount.Should().Be(expectedConsumedCount * expectedMessageCopies); + consumedMessages.Count.Should().Be(expectedConsumedCount * expectedMessageCopies); + + // ... the content should match + foreach (var producedMessage in producedMessages) + { + var messageCopies = consumedMessages.Snapshot() + .Count(x => + x.Message.Counter == producedMessage.Counter + && x.Message.Value == producedMessage.Value + /*&& x.MessageId == GetMessageId(x.Message)*/); + messageCopies.Should().Be((producedMessage is PingDerivedMessage ? 2 : 1) * expectedMessageCopies); + } + + additionalAssertion?.Invoke(new TestData { ProducedMessages = producedMessages, ConsumedMessages = consumedMessages.Snapshot() }); + } + + [Fact] + public async Task BasicReqRespOnQueue() + { + var queue = QueueName(); + var responseQueue = $"{queue}-resp"; + + AddBusConfiguration(mbb => + { + mbb.Produce(x => + { + x.DefaultQueue(queue); + }) + .Handle(x => x.Queue(queue) + .WithHandler() + .Instances(20)) + .ExpectRequestResponses(x => + { + x.ReplyToQueue(responseQueue); + x.DefaultTimeout(TimeSpan.FromSeconds(60)); + }); + }); + await BasicReqResp(); + } + + private async Task BasicReqResp() + { + // arrange + var messageBus = MessageBus; + + // act + + // publish + var stopwatch = Stopwatch.StartNew(); + + var requests = Enumerable + .Range(0, NumberOfMessages) + .Select(i => new EchoRequest { Index = i, Message = $"Echo {i}" }) + .ToList(); + + var responses = new ConcurrentBag>(); + var responseTasks = requests.Select(async req => + { + var resp = await messageBus.Send(req).ConfigureAwait(false); + responses.Add(Tuple.Create(req, resp)); + }); + await Task.WhenAll(responseTasks).ConfigureAwait(false); + + stopwatch.Stop(); + Logger.LogInformation("Published and received {Count} messages in {Elapsed}", responses.Count, stopwatch.Elapsed); + + // assert + + // all messages got back + responses.Count.Should().Be(NumberOfMessages); + responses.All(x => x.Item1.Message == x.Item2.Message).Should().BeTrue(); + } + + private static string QueueName([CallerMemberName] string testName = null) + => $"{QueueNamePrefix}_{DateTimeOffset.UtcNow.Ticks}_{testName}"; +} + +public record TestEvent(PingMessage Message); + +public record PingMessage(int Counter) +{ + public Guid Value { get; set; } = Guid.NewGuid(); + public DateTime Timestamp { get; set; } = DateTime.UtcNow; +} + +public record PingDerivedMessage(int Counter) : PingMessage(Counter); + +public class PingConsumer : IConsumer, IConsumerWithContext +{ + private readonly ILogger _logger; + private readonly TestEventCollector _messages; + + public PingConsumer(ILogger logger, TestEventCollector messages, TestMetric testMetric) + { + _logger = logger; + _messages = messages; + testMetric.OnCreatedConsumer(); + } + + public IConsumerContext Context { get; set; } + + public Task OnHandle(PingMessage message, CancellationToken cancellationToken) + { + var transportMessage = Context.GetTransportMessage(); + + _messages.Add(new(message)); + + _logger.LogInformation("Got message {Counter:000} on path {Path} message id {MessageId}.", message.Counter, Context.Path, transportMessage.MessageId); + return Task.CompletedTask; + } +} + +public class PingDerivedConsumer : IConsumer, IConsumerWithContext +{ + private readonly ILogger _logger; + private readonly TestEventCollector _messages; + + public PingDerivedConsumer(ILogger logger, TestEventCollector messages, TestMetric testMetric) + { + _logger = logger; + _messages = messages; + testMetric.OnCreatedConsumer(); + } + + public IConsumerContext Context { get; set; } + + public Task OnHandle(PingDerivedMessage message, CancellationToken cancellationToken) + { + var transportMessage = Context.GetTransportMessage(); + + _messages.Add(new(message)); + + _logger.LogInformation("Got message {Counter:000} on path {Path} message id {MessageId}.", message.Counter, Context.Path, transportMessage.MessageId); + return Task.CompletedTask; + } +} + +public record EchoRequest : IRequest +{ + public int Index { get; set; } + public string Message { get; set; } +} + +public record EchoResponse(string Message); + +public class EchoRequestHandler : IRequestHandler +{ + public EchoRequestHandler(TestMetric testMetric) + { + testMetric.OnCreatedConsumer(); + } + + public Task OnHandle(EchoRequest request, CancellationToken cancellationToken) + => Task.FromResult(new EchoResponse(request.Message)); +} \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/appsettings.json b/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/appsettings.json new file mode 100644 index 00000000..f8ba05d9 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.AmazonSQS.Test/appsettings.json @@ -0,0 +1,15 @@ +{ + "Serilog": { + "MinimumLevel": { + "Default": "Information", + "Override": { + "SlimMessageBus": "Information", + "Microsoft": "Warning" + } + } + }, + "Amazon": { + "AccessKey": "{{amazon_access_key}}", + "SecretAccessKey": "{{amazon_secret_access_key}}" + } +} diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs index 562aec11..db9d3e56 100644 --- a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs @@ -476,7 +476,5 @@ public EchoRequestHandler(TestMetric testMetric) } public Task OnHandle(EchoRequest request, CancellationToken cancellationToken) - { - return Task.FromResult(new EchoResponse(request.Message)); - } + => Task.FromResult(new EchoResponse(request.Message)); } \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForResponsesTest.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForResponsesTest.cs index 53ec6a22..8139f91b 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForResponsesTest.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForResponsesTest.cs @@ -10,7 +10,9 @@ public class KafkaPartitionConsumerForResponsesTest : IDisposable private readonly TopicPartition _topicPartition; private readonly Mock _commitControllerMock = new(); private readonly Mock _checkpointTrigger = new(); - private KafkaPartitionConsumerForResponses _subject; + private KafkaPartitionConsumerForResponses _subject; + private readonly Mock> _messageProvider = new(); + private readonly Mock _pendingRequestStore = new(); public KafkaPartitionConsumerForResponsesTest() { @@ -30,7 +32,15 @@ public KafkaPartitionConsumerForResponsesTest() } }; - _subject = new KafkaPartitionConsumerForResponses(_messageBusMock.Bus.LoggerFactory, requestResponseSettings, requestResponseSettings.GetGroup(), _topicPartition, _commitControllerMock.Object, _messageBusMock.Bus, _messageBusMock.SerializerMock.Object) + _subject = new KafkaPartitionConsumerForResponses(_messageBusMock.Bus.LoggerFactory, + requestResponseSettings, + requestResponseSettings.GetGroup(), + _topicPartition, + _commitControllerMock.Object, + _messageProvider.Object, + _pendingRequestStore.Object, + _messageBusMock.CurrentTimeProvider, + new DefaultKafkaHeaderSerializer()) { CheckpointTrigger = _checkpointTrigger.Object }; @@ -75,14 +85,26 @@ public void When_OnPartitionAssigned_Then_ShouldResetTrigger() public async Task When_OnMessage_Given_SuccessMessage_ThenOnResponseArrived() { // arrange - var message = GetSomeMessage(); - _subject.OnPartitionAssigned(message.TopicPartition); + var requestId = "1"; + var request = new SomeMessage(); + var responseTransportMessage = GetSomeMessage(); + responseTransportMessage.Message.Headers.Add(ReqRespMessageHeaders.RequestId, Encoding.UTF8.GetBytes(requestId)); + var response = new SomeMessage(); + + _subject.OnPartitionAssigned(responseTransportMessage.TopicPartition); + + var pendingRequestState = new PendingRequestState(requestId, request, request.GetType(), response.GetType(), DateTimeOffset.UtcNow, DateTimeOffset.UtcNow.AddHours(1), default); + _pendingRequestStore.Setup(x => x.GetById(requestId)).Returns(pendingRequestState); + + _messageProvider.Setup(x => x(response.GetType(), responseTransportMessage)).Returns(response); // act - await _subject.OnMessage(message); + await _subject.OnMessage(responseTransportMessage); - // assert - _messageBusMock.BusMock.Verify(x => x.OnResponseArrived(message.Message.Value, message.Topic, It.Is>(x => x.ContainsKey("test-header"))), Times.Once); + // assert + pendingRequestState.TaskCompletionSource.Task.IsCompleted.Should().BeTrue(); + var result = await pendingRequestState.TaskCompletionSource.Task; + result.Should().Be(response); } [Fact] diff --git a/src/Tests/SlimMessageBus.Host.Serialization.Json.Test/JsonMessageSerializerTests.cs b/src/Tests/SlimMessageBus.Host.Serialization.Json.Test/JsonMessageSerializerTests.cs index f19dea13..0a237e38 100644 --- a/src/Tests/SlimMessageBus.Host.Serialization.Json.Test/JsonMessageSerializerTests.cs +++ b/src/Tests/SlimMessageBus.Host.Serialization.Json.Test/JsonMessageSerializerTests.cs @@ -15,9 +15,13 @@ public class JsonMessageSerializerTests [Guid.Empty, "00000000-0000-0000-0000-000000000000"], ]; + public JsonMessageSerializerTests() + { + } + [Theory] [MemberData(nameof(Data))] - public void When_SerializeAndDeserialize_Given_TypeObject_Then_TriesToInferPrimitiveTypes(object value, object expectedValue) + public void When_SerializeAndDeserialize_Given_TypeObjectAndBytesPayload_Then_TriesToInferPrimitiveTypes(object value, object expectedValue) { // arrange var subject = new JsonMessageSerializer(); @@ -29,4 +33,19 @@ public void When_SerializeAndDeserialize_Given_TypeObject_Then_TriesToInferPrimi // assert deserializedValue.Should().Be(expectedValue); } + + [Theory] + [MemberData(nameof(Data))] + public void When_SerializeAndDeserialize_Given_TypeObjectAndStringPayload_Then_TriesToInferPrimitiveTypes(object value, object expectedValue) + { + // arrange + var subject = new JsonMessageSerializer() as IMessageSerializer; + + // act + var json = subject.Serialize(typeof(object), value); + var deserializedValue = subject.Deserialize(typeof(object), json); + + // assert + deserializedValue.Should().Be(expectedValue); + } } \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.Serialization.Json.Test/SerializationBuilderExtensionsTest.cs b/src/Tests/SlimMessageBus.Host.Serialization.Json.Test/SerializationBuilderExtensionsTest.cs index c7305d61..a855004b 100644 --- a/src/Tests/SlimMessageBus.Host.Serialization.Json.Test/SerializationBuilderExtensionsTest.cs +++ b/src/Tests/SlimMessageBus.Host.Serialization.Json.Test/SerializationBuilderExtensionsTest.cs @@ -16,6 +16,7 @@ public void When_AddJsonSerializer_Given_Builder_Then_ServicesRegistered() builder.PostConfigurationActions.ToList().ForEach(action => action(services)); services.Should().ContainSingle(x => x.ServiceType == typeof(IMessageSerializer)); + services.Should().ContainSingle(x => x.ServiceType == typeof(IMessageSerializer)); services.Should().ContainSingle(x => x.ServiceType == typeof(JsonMessageSerializer)); } } diff --git a/src/Tests/SlimMessageBus.Host.Serialization.SystemTextJson.Test/JsonMessageSerializerTests.cs b/src/Tests/SlimMessageBus.Host.Serialization.SystemTextJson.Test/JsonMessageSerializerTests.cs index 1f7fd888..b4af80c1 100644 --- a/src/Tests/SlimMessageBus.Host.Serialization.SystemTextJson.Test/JsonMessageSerializerTests.cs +++ b/src/Tests/SlimMessageBus.Host.Serialization.SystemTextJson.Test/JsonMessageSerializerTests.cs @@ -1,6 +1,5 @@ namespace SlimMessageBus.Host.Serialization.SystemTextJson.Test; - public class JsonMessageSerializerTests { public static IEnumerable Data => @@ -16,7 +15,7 @@ public class JsonMessageSerializerTests [Theory] [MemberData(nameof(Data))] - public void When_SerializeAndDeserialize_Given_TypeObject_Then_TriesToInferPrimitiveTypes(object value, object expectedValue) + public void When_SerializeAndDeserialize_Given_TypeObjectAndBytesPayload_Then_TriesToInferPrimitiveTypes(object value, object expectedValue) { // arrange var subject = new JsonMessageSerializer(); @@ -29,6 +28,21 @@ public void When_SerializeAndDeserialize_Given_TypeObject_Then_TriesToInferPrimi deserializedValue.Should().Be(expectedValue); } + [Theory] + [MemberData(nameof(Data))] + public void When_SerializeAndDeserialize_Given_TypeObjectAndStringPayload_Then_TriesToInferPrimitiveTypes(object value, object expectedValue) + { + // arrange + var subject = new JsonMessageSerializer() as IMessageSerializer; + + // act + var json = subject.Serialize(typeof(object), value); + var deserializedValue = subject.Deserialize(typeof(object), json); + + // assert + deserializedValue.Should().Be(expectedValue); + } + [Theory] [InlineData(true)] [InlineData(false)] diff --git a/src/Tests/SlimMessageBus.Host.Serialization.SystemTextJson.Test/SerializationBuilderExtensionsTest.cs b/src/Tests/SlimMessageBus.Host.Serialization.SystemTextJson.Test/SerializationBuilderExtensionsTest.cs index 97a217f7..2863f76c 100644 --- a/src/Tests/SlimMessageBus.Host.Serialization.SystemTextJson.Test/SerializationBuilderExtensionsTest.cs +++ b/src/Tests/SlimMessageBus.Host.Serialization.SystemTextJson.Test/SerializationBuilderExtensionsTest.cs @@ -16,6 +16,7 @@ public void When_AddJsonSerializer_Given_Builder_Then_ServicesRegistered() builder.PostConfigurationActions.ToList().ForEach(action => action(services)); services.Should().ContainSingle(x => x.ServiceType == typeof(IMessageSerializer)); + services.Should().ContainSingle(x => x.ServiceType == typeof(IMessageSerializer)); services.Should().ContainSingle(x => x.ServiceType == typeof(JsonMessageSerializer)); } } diff --git a/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs b/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs index 88db4f39..9d5d1577 100644 --- a/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs +++ b/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs @@ -74,10 +74,7 @@ protected async Task EnsureConsumersStarted() while (!consumerControl.IsStarted && timeout.ElapsedMilliseconds < 5000) await Task.Delay(100); } - public Task InitializeAsync() - { - return Task.CompletedTask; - } + public Task InitializeAsync() => Task.CompletedTask; async Task IAsyncLifetime.DisposeAsync() { diff --git a/src/Tests/SlimMessageBus.Host.Test/Collections/AsyncTaskListTests.cs b/src/Tests/SlimMessageBus.Host.Test/Collections/AsyncTaskListTests.cs new file mode 100644 index 00000000..4a15d096 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Test/Collections/AsyncTaskListTests.cs @@ -0,0 +1,36 @@ +namespace SlimMessageBus.Host.Test.Collections; + +using System.Collections.Concurrent; + +using SlimMessageBus.Host.Collections; + +public class AsyncTaskListTests +{ + [Fact] + public async Task Given_TaskAdded_When_EnsureAllFinished_Then_TaskIsCompleted() + { + // arrange + + var numberList = new ConcurrentQueue(); + + async Task RunTask(int n) + { + await Task.Delay(100); + numberList.Enqueue(n); + } + + var subject = new AsyncTaskList(); + subject.Add(() => RunTask(1), default); + subject.Add(() => RunTask(2), default); + + // act + await subject.EnsureAllFinished(); + + // assert + numberList.Should().HaveCount(2); + numberList.TryDequeue(out var n1).Should().BeTrue(); + n1.Should().Be(1); + numberList.TryDequeue(out var n2).Should().BeTrue(); + n2.Should().Be(2); + } +} diff --git a/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs b/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs index 20f63294..771866b8 100644 --- a/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs +++ b/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs @@ -3,7 +3,9 @@ public class MessageBusTested : MessageBusBase { internal int _startedCount; - internal int _stoppedCount; + internal int _stoppedCount; + + public IMessageProcessor RequestResponseMessageProcessor { get; private set; } public MessageBusTested(MessageBusSettings settings, ICurrentTimeProvider currentTimeProvider) : base(settings) @@ -12,8 +14,19 @@ public MessageBusTested(MessageBusSettings settings, ICurrentTimeProvider curren OnReply = (type, payload, req) => null; CurrentTimeProvider = currentTimeProvider; - OnBuildProvider(); + OnBuildProvider(); } + + protected override async Task CreateConsumers() + { + await base.CreateConsumers(); + + if (Settings.RequestResponse != null) + { + RequestResponseMessageProcessor = new ResponseMessageProcessor(LoggerFactory, Settings.RequestResponse, (mt, m) => m, PendingRequestStore, CurrentTimeProvider); + AddConsumer(new MessageBusTestedConsumer(NullLogger.Instance)); + } + } public ProducerSettings Public_GetProducerSettings(Type messageType) => GetProducerSettings(messageType); @@ -54,11 +67,10 @@ public override async Task ProduceToTransport(object message, Type messageType, messageHeaders.TryGetHeader(ReqRespMessageHeaders.ReplyTo, out string replyTo); messageHeaders.TryGetHeader(ReqRespMessageHeaders.RequestId, out string requestId); - var responseHeaders = CreateHeaders(); + var responseHeaders = CreateHeaders() as Dictionary; responseHeaders.SetHeader(ReqRespMessageHeaders.RequestId, requestId); - var responsePayload = Serializer.Serialize(resp.GetType(), resp); - await OnResponseArrived(responsePayload, replyTo, (IReadOnlyDictionary)responseHeaders); + await RequestResponseMessageProcessor.ProcessMessage(resp, responseHeaders, null, null, cancellationToken); } } @@ -67,5 +79,12 @@ public override async Task ProduceToTransport(object message, Type messageType, public void TriggerPendingRequestCleanup() { PendingRequestManager.CleanPendingRequests(); + } + + public class MessageBusTestedConsumer(ILogger logger) : AbstractConsumer(logger) + { + protected override Task OnStart() => Task.CompletedTask; + + protected override Task OnStop() => Task.CompletedTask; } }