Skip to content

Commit

Permalink
Amazon.SQS transport
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Nov 24, 2024
1 parent 0c9ce0c commit ed73daa
Show file tree
Hide file tree
Showing 23 changed files with 291 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected SqsBaseConsumer(

T GetSingleValue<T>(Func<AbstractConsumerSettings, T> selector, string settingName, T defaultValue = default)
{
var set = consumerSettings.Select(x => selector(x)).Where(x => !x.Equals(defaultValue)).ToHashSet();
var set = consumerSettings.Select(x => selector(x)).Where(x => x is not null && !x.Equals(defaultValue)).ToHashSet();
if (set.Count > 1)
{
throw new ConfigurationMessageBusException($"All declared consumers across the same queue {path} must have the same {settingName} settings.");
Expand Down
26 changes: 11 additions & 15 deletions src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
namespace SlimMessageBus.Host.AmazonSQS;

using System.Text;
using SlimMessageBus.Host.Serialization;

public class SqsMessageBus : MessageBusBase<SqsMessageBusSettings>
{
private readonly ILogger _logger;
private readonly ISqsClientProvider _clientProvider;
private readonly Dictionary<string, string> _queueUrlByPath = [];
private readonly IMessageSerializer<string> _messageSerializer;

public SqsMessageBus(MessageBusSettings settings, SqsMessageBusSettings providerSettings)
: base(settings, providerSettings)
{
_logger = LoggerFactory.CreateLogger<SqsMessageBus>();
_clientProvider = settings.ServiceProvider.GetRequiredService<ISqsClientProvider>();
_messageSerializer = Serializer as IMessageSerializer<string>
?? throw new ConfigurationMessageBusException($"Serializer for Amazon SQS must be able to serialize into a string (it needs to implement {nameof(IMessageSerializer<string>)})");
OnBuildProvider();
}

Expand All @@ -31,15 +34,6 @@ protected override void Build()
InitTaskList.Add(InitAsync, CancellationToken);
}

private byte[] GetMessagePayload(Message message)
=> Encoding.UTF8.GetBytes(message.Body);

private string GetMessagePayloadString(Type messageType, object message)
{
var messagePayload = Serializer.Serialize(messageType, message);
return Encoding.UTF8.GetString(messagePayload);
}

protected override async Task CreateConsumers()
{
await base.CreateConsumers();
Expand All @@ -56,14 +50,16 @@ void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor<Message>

static void InitConsumerContext(Message m, ConsumerContext ctx) => ctx.SetTransportMessage(m);

object MessageProvider(Type messageType, Message transportMessage) => _messageSerializer.Deserialize(messageType, transportMessage.Body);

foreach (var ((path, pathKind), consumerSettings) in Settings.Consumers
.GroupBy(x => (x.Path, x.PathKind))
.ToDictionary(x => x.Key, x => x.ToList()))
{
var messageProcessor = new MessageProcessor<Message>(
consumerSettings,
this,
messageProvider: (messageType, m) => Serializer.Deserialize(messageType, GetMessagePayload(m)),
messageProvider: MessageProvider,
path: path,
responseProducer: this,
consumerContextInitializer: InitConsumerContext,
Expand All @@ -77,8 +73,9 @@ void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor<Message>
var messageProcessor = new ResponseMessageProcessor<Message>(
LoggerFactory,
Settings.RequestResponse,
responseConsumer: this,
messagePayloadProvider: GetMessagePayload);
messageProvider: MessageProvider,
PendingRequestStore,
CurrentTimeProvider);

AddConsumerFrom(
Settings.RequestResponse.Path,
Expand Down Expand Up @@ -217,8 +214,6 @@ public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBu

private (string Payload, Dictionary<string, MessageAttributeValue> Attributes, string DeduplicationId, string GroupId) GetTransportMessage(object message, Type messageType, string path, IDictionary<string, object> messageHeaders)
{
var messagePayload = GetMessagePayloadString(messageType, message);

var producerSettings = GetProducerSettings(messageType);

var messageDeduplicationIdProvider = producerSettings.GetOrDefault(SqsProperties.MessageDeduplicationId, null);
Expand All @@ -243,6 +238,7 @@ public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBu
}
}

var messagePayload = _messageSerializer.Serialize(messageType, message);
return (messagePayload, messageAttributes, deduplicationId, groupId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class EhPartitionConsumerForConsumers : EhPartitionConsumer
{
private readonly IEnumerable<ConsumerSettings> _consumerSettings;

public EhPartitionConsumerForConsumers(EventHubMessageBus messageBus, IEnumerable<ConsumerSettings> consumerSettings, GroupPathPartitionId pathGroupPartition)
public EhPartitionConsumerForConsumers(EventHubMessageBus messageBus, IEnumerable<ConsumerSettings> consumerSettings, GroupPathPartitionId pathGroupPartition, MessageProvider<EventData> messageProvider)
: base(messageBus, pathGroupPartition)
{
_consumerSettings = consumerSettings ?? throw new ArgumentNullException(nameof(consumerSettings));
Expand All @@ -16,7 +16,7 @@ public EhPartitionConsumerForConsumers(EventHubMessageBus messageBus, IEnumerabl
MessageProcessor = new MessageProcessor<EventData>(
_consumerSettings,
MessageBus,
messageProvider: GetMessageFromTransportMessage,
messageProvider: messageProvider,
path: GroupPathPartition.ToString(),
responseProducer: MessageBus,
consumerContextInitializer: InitializeConsumerContext,
Expand All @@ -30,9 +30,6 @@ protected ICheckpointTrigger CreateCheckpointTrigger()
return f.Create(_consumerSettings);
}

private object GetMessageFromTransportMessage(Type messageType, EventData e)
=> MessageBus.Serializer.Deserialize(messageType, e.Body.ToArray());

private static void InitializeConsumerContext(EventData nativeMessage, ConsumerContext consumerContext)
=> consumerContext.SetTransportMessage(nativeMessage);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@ namespace SlimMessageBus.Host.AzureEventHub;
/// </summary>
public class EhPartitionConsumerForResponses : EhPartitionConsumer
{
public EhPartitionConsumerForResponses(EventHubMessageBus messageBus, RequestResponseSettings requestResponseSettings, GroupPathPartitionId pathGroupPartition)
public EhPartitionConsumerForResponses(
EventHubMessageBus messageBus,
RequestResponseSettings requestResponseSettings,
GroupPathPartitionId pathGroupPartition,
MessageProvider<EventData> messageProvider,
IPendingRequestStore pendingRequestStore,
ICurrentTimeProvider currentTimeProvider)
: base(messageBus, pathGroupPartition)
{
if (requestResponseSettings == null) throw new ArgumentNullException(nameof(requestResponseSettings));

MessageProcessor = new ResponseMessageProcessor<EventData>(MessageBus.LoggerFactory, requestResponseSettings, MessageBus, messagePayloadProvider: eventData => eventData.EventBody.ToArray());
MessageProcessor = new ResponseMessageProcessor<EventData>(MessageBus.LoggerFactory, requestResponseSettings, messageProvider, pendingRequestStore, currentTimeProvider);
CheckpointTrigger = new CheckpointTrigger(requestResponseSettings, MessageBus.LoggerFactory);
}
}
6 changes: 4 additions & 2 deletions src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,19 @@ protected override async Task CreateConsumers()
{
await base.CreateConsumers();

object MessageProvider(Type messageType, EventData transportMessage) => Serializer.Deserialize(messageType, transportMessage.Body.ToArray());

foreach (var (groupPath, consumerSettings) in Settings.Consumers.GroupBy(x => new GroupPath(path: x.Path, group: x.GetGroup())).ToDictionary(x => x.Key, x => x.ToList()))
{
_logger.LogInformation("Creating consumer for Path: {Path}, Group: {Group}", groupPath.Path, groupPath.Group);
AddConsumer(new EhGroupConsumer(this, groupPath, groupPathPartition => new EhPartitionConsumerForConsumers(this, consumerSettings, groupPathPartition)));
AddConsumer(new EhGroupConsumer(this, groupPath, groupPathPartition => new EhPartitionConsumerForConsumers(this, consumerSettings, groupPathPartition, MessageProvider)));
}

if (Settings.RequestResponse != null)
{
var pathGroup = new GroupPath(Settings.RequestResponse.Path, Settings.RequestResponse.GetGroup());
_logger.LogInformation("Creating response consumer for Path: {Path}, Group: {Group}", pathGroup.Path, pathGroup.Group);
AddConsumer(new EhGroupConsumer(this, pathGroup, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition)));
AddConsumer(new EhGroupConsumer(this, pathGroup, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition, MessageProvider, PendingRequestStore, CurrentTimeProvider)));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
}

static void InitConsumerContext(ServiceBusReceivedMessage m, ConsumerContext ctx) => ctx.SetTransportMessage(m);
object MessageProvider(Type messageType, ServiceBusReceivedMessage m) => Serializer.Deserialize(messageType, m.Body.ToArray());

foreach (var ((path, subscriptionName), consumerSettings) in Settings.Consumers
.GroupBy(x => (x.Path, SubscriptionName: x.GetSubscriptionName(ProviderSettings)))
Expand All @@ -95,7 +96,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
var messageProcessor = new MessageProcessor<ServiceBusReceivedMessage>(
consumerSettings,
this,
messageProvider: (messageType, m) => Serializer.Deserialize(messageType, m.Body.ToArray()),
messageProvider: MessageProvider,
path: path.ToString(),
responseProducer: this,
consumerContextInitializer: InitConsumerContext,
Expand All @@ -110,8 +111,9 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
var messageProcessor = new ResponseMessageProcessor<ServiceBusReceivedMessage>(
LoggerFactory,
Settings.RequestResponse,
responseConsumer: this,
messagePayloadProvider: m => m.Body.ToArray());
MessageProvider,
PendingRequestStore,
CurrentTimeProvider);

AddConsumerFrom(topicSubscription, messageProcessor, [Settings.RequestResponse]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,16 @@ namespace SlimMessageBus.Host.Kafka;
/// </summary>
public class KafkaPartitionConsumerForResponses : KafkaPartitionConsumer
{
public KafkaPartitionConsumerForResponses(ILoggerFactory loggerFactory, RequestResponseSettings requestResponseSettings, string group, TopicPartition topicPartition, IKafkaCommitController commitController, IResponseConsumer responseConsumer, IMessageSerializer headerSerializer)
public KafkaPartitionConsumerForResponses(
ILoggerFactory loggerFactory,
RequestResponseSettings requestResponseSettings,
string group,
TopicPartition topicPartition,
IKafkaCommitController commitController,
MessageProvider<ConsumeResult> messageProvider,
IPendingRequestStore pendingRequestStore,
ICurrentTimeProvider currentTimeProvider,
IMessageSerializer headerSerializer)
: base(
loggerFactory,
[requestResponseSettings],
Expand All @@ -19,8 +28,9 @@ public KafkaPartitionConsumerForResponses(ILoggerFactory loggerFactory, RequestR
messageProcessor: new ResponseMessageProcessor<ConsumeResult>(
loggerFactory,
requestResponseSettings,
responseConsumer,
messagePayloadProvider: m => m.Message.Value))
messageProvider,
pendingRequestStore,
currentTimeProvider))
{
}
}
9 changes: 7 additions & 2 deletions src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,20 @@ void AddGroupConsumer(string group, IReadOnlyCollection<string> topics, Func<Top
AddConsumer(new KafkaGroupConsumer(LoggerFactory, ProviderSettings, group, topics, processorFactory));
}

IKafkaPartitionConsumer ResponseProcessorFactory(TopicPartition tp, IKafkaCommitController cc) => new KafkaPartitionConsumerForResponses(LoggerFactory, Settings.RequestResponse, Settings.RequestResponse.GetGroup(), tp, cc, this, HeaderSerializer);
object MessageProvider(Type messageType, ConsumeResult<Ignore, byte[]> transportMessage)
=> Serializer.Deserialize(messageType, transportMessage.Message.Value);

IKafkaPartitionConsumer ResponseProcessorFactory(TopicPartition tp, IKafkaCommitController cc)
=> new KafkaPartitionConsumerForResponses(LoggerFactory, Settings.RequestResponse, Settings.RequestResponse.GetGroup(), tp, cc, MessageProvider, PendingRequestStore, CurrentTimeProvider, HeaderSerializer);

foreach (var consumersByGroup in Settings.Consumers.GroupBy(x => x.GetGroup()))
{
var group = consumersByGroup.Key;
var consumersByTopic = consumersByGroup.GroupBy(x => x.Path).ToDictionary(x => x.Key, x => x.ToArray());
var topics = consumersByTopic.Keys.ToList();

IKafkaPartitionConsumer ConsumerProcessorFactory(TopicPartition tp, IKafkaCommitController cc) => new KafkaPartitionConsumerForConsumers(LoggerFactory, consumersByTopic[tp.Topic], group, tp, cc, HeaderSerializer, this);
IKafkaPartitionConsumer ConsumerProcessorFactory(TopicPartition tp, IKafkaCommitController cc)
=> new KafkaPartitionConsumerForConsumers(LoggerFactory, consumersByTopic[tp.Topic], group, tp, cc, HeaderSerializer, this);

var processorFactory = ConsumerProcessorFactory;

Expand Down
5 changes: 3 additions & 2 deletions src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ void AddTopicConsumer(string topic, IMessageProcessor<MqttApplicationMessage> me
var processor = new ResponseMessageProcessor<MqttApplicationMessage>(
LoggerFactory,
Settings.RequestResponse,
responseConsumer: this,
messagePayloadProvider: m => m.PayloadSegment.Array);
messageProvider: MessageProvider,
PendingRequestStore,
CurrentTimeProvider);

AddTopicConsumer(Settings.RequestResponse.Path, processor);
}
Expand Down
6 changes: 4 additions & 2 deletions src/SlimMessageBus.Host.Nats/NatsMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ protected override async Task CreateConsumers()

await base.CreateConsumers();

object MessageProvider(Type messageType, NatsMsg<byte[]> transportMessage) => Serializer.Deserialize(messageType, transportMessage.Data);

foreach (var (subject, consumerSettings) in Settings.Consumers.GroupBy(x => x.Path).ToDictionary(x => x.Key, x => x.ToList()))
{
var processor = new MessageProcessor<NatsMsg<byte[]>>(
consumerSettings,
messageBus: this,
messageProvider: (type, message) => Serializer.Deserialize(type, message.Data),
messageProvider: MessageProvider,
subject,
this,
consumerErrorHandlerOpenGenericType: typeof(INatsConsumerErrorHandler<>));
Expand All @@ -71,7 +73,7 @@ protected override async Task CreateConsumers()

if (Settings.RequestResponse != null)
{
var processor = new ResponseMessageProcessor<NatsMsg<byte[]>>(LoggerFactory, Settings.RequestResponse, this, message => message.Data);
var processor = new ResponseMessageProcessor<NatsMsg<byte[]>>(LoggerFactory, Settings.RequestResponse, MessageProvider, PendingRequestStore, CurrentTimeProvider);
AddSubjectConsumer(Settings.RequestResponse.Path, processor);
}
}
Expand Down
12 changes: 10 additions & 2 deletions src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@ public class RabbitMqConsumer : AbstractRabbitMqConsumer

protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => _acknowledgementMode;

public RabbitMqConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, IList<ConsumerSettings> consumers, IMessageSerializer serializer, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter)
public RabbitMqConsumer(
ILoggerFactory loggerFactory,
IRabbitMqChannel channel,
string queueName,
IList<ConsumerSettings> consumers,
IMessageSerializer serializer,
MessageBusBase messageBus,
MessageProvider<BasicDeliverEventArgs> messageProvider,
IHeaderValueConverter headerValueConverter)
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(), channel, queueName, headerValueConverter)
{
_acknowledgementMode = consumers.Select(x => x.GetOrDefault<RabbitMqMessageAcknowledgementMode?>(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null)
Expand All @@ -21,7 +29,7 @@ public RabbitMqConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel,
messageBus,
path: queueName,
responseProducer: messageBus,
messageProvider: (messageType, m) => serializer.Deserialize(messageType, m.Body.ToArray()),
messageProvider: messageProvider,
consumerContextInitializer: InitializeConsumerContext,
consumerErrorHandlerOpenGenericType: typeof(IRabbitMqConsumerErrorHandler<>));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@ public class RabbitMqResponseConsumer : AbstractRabbitMqConsumer

protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade;

public RabbitMqResponseConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, RequestResponseSettings requestResponseSettings, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter)
public RabbitMqResponseConsumer(
ILoggerFactory loggerFactory,
IRabbitMqChannel channel,
string queueName,
RequestResponseSettings requestResponseSettings,
MessageProvider<BasicDeliverEventArgs> messageProvider,
IPendingRequestStore pendingRequestStore,
ICurrentTimeProvider currentTimeProvider,
IHeaderValueConverter headerValueConverter)
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(), channel, queueName, headerValueConverter)
{
_messageProcessor = new ResponseMessageProcessor<BasicDeliverEventArgs>(loggerFactory, requestResponseSettings, messageBus, m => m.Body.ToArray());
_messageProcessor = new ResponseMessageProcessor<BasicDeliverEventArgs>(loggerFactory, requestResponseSettings, messageProvider, pendingRequestStore, currentTimeProvider);
}

protected override async Task<Exception> OnMessageReceived(Dictionary<string, object> messageHeaders, BasicDeliverEventArgs transportMessage)
Expand Down
Loading

0 comments on commit ed73daa

Please sign in to comment.