diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml index a1eda2f8..52d1bb2e 100644 --- a/src/Host.Plugin.Properties.xml +++ b/src/Host.Plugin.Properties.xml @@ -4,7 +4,7 @@ - 3.0.0-rc902 + 3.0.0-rc904 \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AspNetCore/HttpContextAccessorCurrentMessageBusProvider.cs b/src/SlimMessageBus.Host.AspNetCore/HttpContextAccessorCurrentMessageBusProvider.cs index ef231f89..2bd0f717 100644 --- a/src/SlimMessageBus.Host.AspNetCore/HttpContextAccessorCurrentMessageBusProvider.cs +++ b/src/SlimMessageBus.Host.AspNetCore/HttpContextAccessorCurrentMessageBusProvider.cs @@ -3,24 +3,46 @@ /// /// Resolves the from the current ASP.NET Core web request (if present, otherwise falls back to the application root container). /// -public class HttpContextAccessorCurrentMessageBusProvider( +public partial class HttpContextAccessorCurrentMessageBusProvider( ILogger logger, IHttpContextAccessor httpContextAccessor, IServiceProvider serviceProvider) : CurrentMessageBusProvider(serviceProvider) { + private readonly ILogger _logger = logger; + public override IMessageBus GetCurrent() { // When the call to resolve the given type is made within an HTTP Request, use the request scope service provider var httpContext = httpContextAccessor?.HttpContext; if (httpContext != null) { - logger.LogTrace("The type IMessageBus will be requested from the per-request scope"); + LogCurrentFrom("request"); return httpContext.RequestServices.GetService(); } // otherwise use the app wide scope provider - logger.LogTrace("The type IMessageBus will be requested from the app scope"); + LogCurrentFrom("root"); return base.GetCurrent(); } + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Trace, + Message = "The type IMessageBus will be requested from the {ScopeName} scope")] + private partial void LogCurrentFrom(string scopeName); + + #endregion } + +#if NETSTANDARD2_0 + +public partial class HttpContextAccessorCurrentMessageBusProvider +{ + private partial void LogCurrentFrom(string scopeName) + => _logger.LogTrace("The type IMessageBus will be requested from the {ScopeName} scope", scopeName); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs index 30658f06..f46f14d4 100644 --- a/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs +++ b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs @@ -3,7 +3,7 @@ /// /// Circuit breaker to toggle consumer status on an external events. /// -internal sealed class CircuitBreakerAbstractConsumerInterceptor : IAbstractConsumerInterceptor +internal sealed class CircuitBreakerAbstractConsumerInterceptor(ILogger logger) : IAbstractConsumerInterceptor { public int Order => 100; @@ -33,12 +33,12 @@ async Task BreakerChanged(Circuit state) var bus = consumer.Settings[0].MessageBusSettings.Name ?? "default"; if (shouldPause) { - consumer.Logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus); + logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus); await consumer.DoStop().ConfigureAwait(false); } else { - consumer.Logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus); + logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus); await consumer.DoStart().ConfigureAwait(false); } consumer.SetIsPaused(shouldPause); diff --git a/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj b/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj index b01a3b49..46567f42 100644 --- a/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj +++ b/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj @@ -6,7 +6,7 @@ Core configuration interfaces of SlimMessageBus SlimMessageBus SlimMessageBus.Host - 3.0.0-rc901 + 3.0.0-rc904 diff --git a/src/SlimMessageBus.Host.Interceptor/SlimMessageBus.Host.Interceptor.csproj b/src/SlimMessageBus.Host.Interceptor/SlimMessageBus.Host.Interceptor.csproj index b98b06a5..e44ce8c0 100644 --- a/src/SlimMessageBus.Host.Interceptor/SlimMessageBus.Host.Interceptor.csproj +++ b/src/SlimMessageBus.Host.Interceptor/SlimMessageBus.Host.Interceptor.csproj @@ -3,7 +3,7 @@ - 3.0.0-rc901 + 3.0.0-rc904 Core interceptor interfaces of SlimMessageBus SlimMessageBus diff --git a/src/SlimMessageBus.Host.Memory/Consumers/AbstractMessageProcessorQueue.cs b/src/SlimMessageBus.Host.Memory/Consumers/AbstractMessageProcessorQueue.cs index 4ceb8ce0..d634c221 100644 --- a/src/SlimMessageBus.Host.Memory/Consumers/AbstractMessageProcessorQueue.cs +++ b/src/SlimMessageBus.Host.Memory/Consumers/AbstractMessageProcessorQueue.cs @@ -1,7 +1,9 @@ namespace SlimMessageBus.Host.Memory; -public abstract class AbstractMessageProcessorQueue(IMessageProcessor messageProcessor, ILogger logger) : IMessageProcessorQueue +public abstract partial class AbstractMessageProcessorQueue(IMessageProcessor messageProcessor, ILogger logger) : IMessageProcessorQueue { + private readonly ILogger _logger = logger; + public abstract void Enqueue(object transportMessage, IReadOnlyDictionary messageHeaders); protected async Task ProcessMessage(object transportMessage, IReadOnlyDictionary messageHeaders, CancellationToken cancellationToken) @@ -23,7 +25,27 @@ protected async Task ProcessMessage(object transportMessage, IReadOnlyDictionary if (r.Exception != null) { // We rely on the IMessageProcessor to execute the ConsumerErrorHandler, but if it's not registered in the DI, it fails, or there is another fatal error then the message will be lost. - logger.LogError(r.Exception, "Error processing message {Message} of type {MessageType}", transportMessage, transportMessage.GetType()); + LogMessageError(transportMessage, transportMessage.GetType(), r.Exception); } } + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Error, + Message = "Error processing message {TransportMessage} of type {TransportMessageType}")] + private partial void LogMessageError(object transportMessage, Type transportMessageType, Exception e); + + #endregion } + +#if NETSTANDARD2_0 + +public abstract partial class AbstractMessageProcessorQueue +{ + private partial void LogMessageError(object transportMessage, Type transportMessageType, Exception e) + => _logger.LogError(e, "Error processing message {TransportMessage} of type {TransportMessageType}", transportMessage, transportMessageType); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Memory/Consumers/MessageProcessorQueue.cs b/src/SlimMessageBus.Host.Memory/Consumers/MessageProcessorQueue.cs index 5177abbc..090771fa 100644 --- a/src/SlimMessageBus.Host.Memory/Consumers/MessageProcessorQueue.cs +++ b/src/SlimMessageBus.Host.Memory/Consumers/MessageProcessorQueue.cs @@ -1,6 +1,9 @@ namespace SlimMessageBus.Host.Memory; -public class MessageProcessorQueue(IMessageProcessor messageProcessor, ILogger logger, CancellationToken cancellationToken) : AbstractMessageProcessorQueue(messageProcessor, logger) +public class MessageProcessorQueue(IMessageProcessor messageProcessor, + ILogger logger, + CancellationToken cancellationToken) + : AbstractMessageProcessorQueue(messageProcessor, logger) { private readonly object _prevTaskLock = new(); private Task _prevTask = null; diff --git a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs index 20b36e1a..44d1a7af 100644 --- a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs +++ b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs @@ -5,7 +5,7 @@ /// /// In-memory message bus implementation to use for in process message passing. /// -public class MemoryMessageBus : MessageBusBase +public partial class MemoryMessageBus : MessageBusBase { private readonly ILogger _logger; private IDictionary> _messageProcessorByPath; @@ -63,11 +63,8 @@ public override IDictionary CreateHeaders() public override bool IsMessageScopeEnabled(ConsumerSettings consumerSettings, IDictionary consumerContextProperties) { -#if NETSTANDARD2_0 if (consumerSettings is null) throw new ArgumentNullException(nameof(consumerSettings)); -#else - ArgumentNullException.ThrowIfNull(consumerSettings); -#endif + if (consumerContextProperties != null && consumerContextProperties.ContainsKey(MemoryMessageBusProperties.CreateScope)) { return true; @@ -133,7 +130,7 @@ private async Task ProduceInternal(object me path ??= GetDefaultPath(producerSettings.MessageType, producerSettings); if (!_messageProcessorByPath.TryGetValue(path, out var messageProcessor)) { - _logger.LogDebug("No consumers interested in message type {MessageType} on path {Path}", messageType, path); + LogNoConsumerInterestedInMessageType(path, messageType); return default; } @@ -165,4 +162,24 @@ private async Task ProduceInternal(object me } return (TResponseMessage)r.Response; } + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "No consumers interested in message type {MessageType} on path {Path}")] + private partial void LogNoConsumerInterestedInMessageType(string path, Type messageType); + + #endregion } + +#if NETSTANDARD2_0 + +public partial class MemoryMessageBus +{ + private partial void LogNoConsumerInterestedInMessageType(string path, Type messageType) + => _logger.LogDebug("No consumers interested in message type {MessageType} on path {Path}", messageType, path); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Serialization.Avro/AvroMessageSerializer.cs b/src/SlimMessageBus.Host.Serialization.Avro/AvroMessageSerializer.cs index 8b49db64..35c48e16 100644 --- a/src/SlimMessageBus.Host.Serialization.Avro/AvroMessageSerializer.cs +++ b/src/SlimMessageBus.Host.Serialization.Avro/AvroMessageSerializer.cs @@ -52,8 +52,8 @@ public AvroMessageSerializer(ILoggerFactory loggerFactory = null) var mf = new ReflectionMessageCreationStrategy(loggerFactory.CreateLogger()); var ml = new ReflectionSchemaLookupStrategy(loggerFactory.CreateLogger()); - MessageFactory = (Type type) => mf.Create(type); - WriteSchemaLookup = (Type type) => ml.Lookup(type); + MessageFactory = mf.Create; + WriteSchemaLookup = ml.Lookup; ReadSchemaLookup = WriteSchemaLookup; } @@ -73,7 +73,7 @@ public AvroMessageSerializer(ILoggerFactory loggerFactory, IMessageCreationStrat public object Deserialize(Type t, byte[] payload) { using var ms = ReadMemoryStreamFactory(payload); - + var dec = new BinaryDecoder(ms); var message = MessageFactory(t); @@ -84,7 +84,7 @@ public object Deserialize(Type t, byte[] payload) var writerSchema = WriteSchemaLookup(t); AssertSchemaNotNull(t, writerSchema, true); - _logger.LogDebug("Type {0} writer schema: {1}, reader schema: {2}", t, writerSchema, readerSchema); + _logger.LogDebug("Type {Type} writer schema: {WriterSchema}, reader schema: {ReaderSchema}", t, writerSchema, readerSchema); var reader = new SpecificDefaultReader(writerSchema, readerSchema); reader.Read(message, dec); @@ -108,7 +108,7 @@ public byte[] Serialize(Type t, object message) var writerSchema = WriteSchemaLookup(t); AssertSchemaNotNull(t, writerSchema, true); - _logger.LogDebug("Type {0} writer schema: {1}", t, writerSchema); + _logger.LogDebug("Type {Type} writer schema: {WriterSchema}", t, writerSchema); var writer = new SpecificDefaultWriter(writerSchema); // Schema comes from pre-compiled, code-gen phase writer.Write(message, enc); diff --git a/src/SlimMessageBus.Host.Serialization/SlimMessageBus.Host.Serialization.csproj b/src/SlimMessageBus.Host.Serialization/SlimMessageBus.Host.Serialization.csproj index 4a23fc6e..ca482ff8 100644 --- a/src/SlimMessageBus.Host.Serialization/SlimMessageBus.Host.Serialization.csproj +++ b/src/SlimMessageBus.Host.Serialization/SlimMessageBus.Host.Serialization.csproj @@ -3,7 +3,7 @@ - 3.0.0-rc901 + 3.0.0-rc904 Core serialization interfaces of SlimMessageBus SlimMessageBus diff --git a/src/SlimMessageBus.Host/Collections/KindMapping.cs b/src/SlimMessageBus.Host/Collections/KindMapping.cs index 21acf909..a14f6bb5 100644 --- a/src/SlimMessageBus.Host/Collections/KindMapping.cs +++ b/src/SlimMessageBus.Host/Collections/KindMapping.cs @@ -2,8 +2,8 @@ public class KindMapping { - private readonly Dictionary _kindByTopic = new(); - private readonly Dictionary _kindByMessageType = new(); + private readonly Dictionary _kindByTopic = []; + private readonly Dictionary _kindByMessageType = []; public void Configure(MessageBusSettings settings) { diff --git a/src/SlimMessageBus.Host/Collections/ProducerByMessageTypeCache.cs b/src/SlimMessageBus.Host/Collections/ProducerByMessageTypeCache.cs index ff6e2bcb..614416a2 100644 --- a/src/SlimMessageBus.Host/Collections/ProducerByMessageTypeCache.cs +++ b/src/SlimMessageBus.Host/Collections/ProducerByMessageTypeCache.cs @@ -5,7 +5,7 @@ /// The message type hierarchy is discovered at runtime and cached for faster access. /// /// The producer type -public class ProducerByMessageTypeCache : IReadOnlyCache +public partial class ProducerByMessageTypeCache : IReadOnlyCache where TProducer : class { private readonly ILogger _logger; @@ -37,7 +37,7 @@ private TProducer CalculateProducer(Type messageType) var assignableProducer = assignableProducers.FirstOrDefault(); if (assignableProducer.Key != null) { - _logger.LogDebug("Matched producer for message type {ProducerMessageType} for dispatched message type {MessageType}", assignableProducer.Key, messageType); + LogMatchedProducerForMessageType(messageType, assignableProducer.Key); return assignableProducer.Value; } @@ -52,7 +52,7 @@ private TProducer CalculateProducer(Type messageType) } } - _logger.LogDebug("Unable to match any declared producer for dispatched message type {MessageType}", messageType); + LogUnmatchedProducerForMessageType(messageType); // Note: Nulls are also added to dictionary, so that we don't look them up using reflection next time (cached). return null; @@ -74,4 +74,33 @@ private static int CalculateBaseClassDistance(Type type, Type baseType) return distance; } -} \ No newline at end of file + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Matched producer for message type {ProducerMessageType} for dispatched message type {MessageType}")] + private partial void LogMatchedProducerForMessageType(Type messageType, Type producerMessageType); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "Unable to match any declared producer for dispatched message type {MessageType}")] + private partial void LogUnmatchedProducerForMessageType(Type messageType); + + #endregion +} + +#if NETSTANDARD2_0 + +public partial class ProducerByMessageTypeCache +{ + private partial void LogMatchedProducerForMessageType(Type messageType, Type producerMessageType) + => _logger.LogDebug("Matched producer for message type {ProducerMessageType} for dispatched message type {MessageType}", producerMessageType, messageType); + + private partial void LogUnmatchedProducerForMessageType(Type messageType) + => _logger.LogDebug("Unable to match any declared producer for dispatched message type {MessageType}", messageType); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs b/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs index c5c8d736..364485c0 100644 --- a/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs +++ b/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs @@ -1,7 +1,8 @@ namespace SlimMessageBus.Host; -public abstract class AbstractConsumer : HasProviderExtensions, IAsyncDisposable, IConsumerControl +public abstract partial class AbstractConsumer : HasProviderExtensions, IAsyncDisposable, IConsumerControl { + protected readonly ILogger Logger; private readonly SemaphoreSlim _semaphore; private readonly IReadOnlyList _interceptors; private CancellationTokenSource _cancellationTokenSource; @@ -10,7 +11,6 @@ public abstract class AbstractConsumer : HasProviderExtensions, IAsyncDisposable public bool IsStarted { get; private set; } public string Path { get; } - public ILogger Logger { get; } public IReadOnlyList Settings { get; } protected CancellationToken CancellationToken => _cancellationTokenSource.Token; @@ -39,7 +39,7 @@ private async Task CallInterceptor(Func Logger.LogError(ex, "Interceptor {InterceptorType} failed with error: {Error}", interceptorType, error); } + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/Checkpointing/CheckpointTrigger.cs b/src/SlimMessageBus.Host/Consumer/Checkpointing/CheckpointTrigger.cs index 49f474a9..7787b853 100644 --- a/src/SlimMessageBus.Host/Consumer/Checkpointing/CheckpointTrigger.cs +++ b/src/SlimMessageBus.Host/Consumer/Checkpointing/CheckpointTrigger.cs @@ -2,7 +2,7 @@ using System.Diagnostics; -public class CheckpointTrigger : ICheckpointTrigger +public partial class CheckpointTrigger : ICheckpointTrigger { private readonly ILogger _logger; @@ -35,7 +35,6 @@ public static CheckpointValue GetCheckpointValue(HasProviderExtensions settings) => new(settings.GetOrDefault(CheckpointSettings.CheckpointCount, CheckpointSettings.CheckpointCountDefault), settings.GetOrDefault(CheckpointSettings.CheckpointDuration, CheckpointSettings.CheckpointDurationDefault)); - #region Implementation of ICheckpointTrigger public bool IsEnabled @@ -53,17 +52,37 @@ public bool Increment() var enabled = IsEnabled; if (enabled && _logger.IsEnabled(LogLevel.Debug)) { - _logger.LogDebug("Checkpoint triggered after Count: {CheckpointCount}, Duration: {CheckpointDuration} (s)", _lastCheckpointCount, _lastCheckpointDuration.Elapsed.Seconds); + LogCheckpointTriggered(_lastCheckpointCount, _lastCheckpointDuration.Elapsed.Seconds); } return enabled; - } - + } + public void Reset() { _lastCheckpointCount = 0; _lastCheckpointDuration.Restart(); - } - + } + + #endregion + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Checkpoint triggered after Count: {CheckpointCount}, Duration: {CheckpointDuration} (s)")] + private partial void LogCheckpointTriggered(int checkpointCount, int checkpointDuration); + #endregion } + +#if NETSTANDARD2_0 + +public partial class CheckpointTrigger +{ + private partial void LogCheckpointTriggered(int checkpointCount, int checkpointDuration) + => _logger.LogDebug("Checkpoint triggered after Count: {CheckpointCount}, Duration: {CheckpointDuration} (s)", checkpointCount, checkpointDuration); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs index 0fba6f97..5c0230ae 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs @@ -5,7 +5,7 @@ /// The expectation is that will be executed synchronously (in sequential order) by the caller on which we want to increase amount of concurrent transportMessage being processed. /// /// -public sealed class ConcurrentMessageProcessorDecorator : IMessageProcessor, IDisposable +public sealed partial class ConcurrentMessageProcessorDecorator : IMessageProcessor, IDisposable { private readonly ILogger _logger; private SemaphoreSlim _concurrentSemaphore; @@ -87,7 +87,8 @@ private async Task ProcessInBackground(TMessage transportMessage, IReadOnlyDicti { try { - _logger.LogDebug("Entering ProcessMessages for message {MessageType}", typeof(TMessage)); + LogEntering(typeof(TMessage)); + var r = await _target.ProcessMessage(transportMessage, messageHeaders, consumerContextProperties, currentServiceProvider, cancellationToken).ConfigureAwait(false); if (r.Exception != null) { @@ -105,10 +106,39 @@ private async Task ProcessInBackground(TMessage transportMessage, IReadOnlyDicti } finally { - _logger.LogDebug("Leaving ProcessMessages for message {MessageType}", typeof(TMessage)); + LogLeaving(typeof(TMessage)); _concurrentSemaphore?.Release(); Interlocked.Decrement(ref _pendingCount); } } -} \ No newline at end of file + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Entering ProcessMessages for message {MessageType}")] + private partial void LogEntering(Type messageType); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "Leaving ProcessMessages for message {MessageType}")] + private partial void LogLeaving(Type messageType); + + #endregion +} + +#if NETSTANDARD2_0 + +public partial class ConcurrentMessageProcessorDecorator +{ + private partial void LogEntering(Type messageType) + => _logger.LogDebug("Entering ProcessMessages for message {MessageType}", messageType); + + private partial void LogLeaving(Type messageType) + => _logger.LogDebug("Leaving ProcessMessages for message {MessageType}", messageType); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs index 62d873ae..e785d199 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs @@ -2,7 +2,7 @@ using SlimMessageBus.Host.Consumer; -public class MessageHandler : IMessageHandler +public partial class MessageHandler : IMessageHandler { private readonly ILogger _logger; private readonly IMessageScopeFactory _messageScopeFactory; @@ -31,11 +31,7 @@ public MessageHandler( string path, Type consumerErrorHandlerOpenGenericType = null) { -#if NETSTANDARD2_0 if (messageBus is null) throw new ArgumentNullException(nameof(messageBus)); -#else - ArgumentNullException.ThrowIfNull(messageBus); -#endif _logger = messageBus.LoggerFactory.CreateLogger(); _messageScopeFactory = messageScopeFactory; @@ -127,7 +123,7 @@ public MessageHandler( { if (consumerInvoker.ParentSettings.IsDisposeConsumerEnabled && consumerInstance is IDisposable consumerInstanceDisposable) { - _logger.LogDebug("Disposing consumer instance {Consumer} of type {ConsumerType}", consumerInstance, consumerType); + LogDisposingConsumer(consumerType, consumerInstance); consumerInstanceDisposable.DisposeSilently("ConsumerInstance", _logger); } } @@ -162,7 +158,7 @@ private async Task DoHandleError(object message, Type messageType if (consumerErrorHandler != null) { - _logger.LogDebug(ex, "Consumer error handler of type {ConsumerErrorHandlerType} will be used to handle the exception during processing of message of type {MessageType}", consumerErrorHandler.GetType(), messageType); + LogConsumerErrorHandlerWillBeUsed(messageType, consumerErrorHandler.GetType(), ex); var consumerErrorHandlerMethod = RuntimeTypeCache.ConsumerErrorHandlerType[messageType]; errorHandlerResult = await consumerErrorHandlerMethod(consumerErrorHandler, message, consumerContext, ex, attempts).ConfigureAwait(false); @@ -211,4 +207,33 @@ public async Task ExecuteConsumer(object message, IConsumerContext consu return null; } -} \ No newline at end of file + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Disposing consumer instance {Consumer} of type {ConsumerType}")] + private partial void LogDisposingConsumer(Type consumerType, object consumer); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "Consumer error handler of type {ConsumerErrorHandlerType} will be used to handle the exception during processing of message of type {MessageType}")] + private partial void LogConsumerErrorHandlerWillBeUsed(Type messageType, Type consumerErrorHandlerType, Exception ex); + + #endregion +} + +#if NETSTANDARD2_0 + +public partial class MessageHandler +{ + private partial void LogDisposingConsumer(Type consumerType, object consumer) + => _logger.LogDebug("Disposing consumer instance {Consumer} of type {ConsumerType}", consumer, consumerType); + + private partial void LogConsumerErrorHandlerWillBeUsed(Type messageType, Type consumerErrorHandlerType, Exception ex) + => _logger.LogDebug(ex, "Consumer error handler of type {ConsumerErrorHandlerType} will be used to handle the exception during processing of message of type {MessageType}", consumerErrorHandlerType, messageType); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs index 74f19aa2..47d4b727 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs @@ -6,7 +6,7 @@ /// Implementation of that performs orchestration around processing of a new message using an instance of the declared consumer ( or interface). /// /// -public class MessageProcessor : MessageHandler, IMessageProcessor +public partial class MessageProcessor : MessageHandler, IMessageProcessor { private readonly ILogger _logger; private readonly MessageProvider _messageProvider; @@ -131,14 +131,14 @@ public async virtual Task ProcessMessage(TTransportMessage } catch (Exception e) { - _logger.LogDebug(e, "Processing of the message {TransportMessage} of type {MessageType} failed", transportMessage, messageType); + LogProcessingMessageFailedTypeKnown(transportMessage, messageType, e); lastException ??= e; } } } catch (Exception e) { - _logger.LogDebug(e, "Processing of the message {TransportMessage} failed", transportMessage); + LogProcessingMessageFailed(transportMessage, e); lastException = e; } return new(result, lastException, lastException != null ? lastConsumerInvoker?.ParentSettings : null, lastResponse); @@ -151,7 +151,7 @@ protected Type GetMessageType(IReadOnlyDictionary headers) var messageType = MessageTypeResolver.ToType(messageTypeName); if (messageType != null) { - _logger.LogDebug("Message type {MessageType} was declared in the message header", messageType); + LogMessageTypeDeclaredInHeader(messageType); return messageType; } @@ -164,11 +164,11 @@ protected Type GetMessageType(IReadOnlyDictionary headers) if (_singleInvoker != null) { - _logger.LogDebug("No message type header was present, defaulting to the only declared message type {MessageType}", _singleInvoker.MessageType); + LogMessageTypeHeaderMissingAndDefaulting(_singleInvoker.MessageType); return _singleInvoker.MessageType; } - _logger.LogDebug("No message type header was present in the message header, multiple consumer types declared therefore cannot infer the message type"); + LogNoMessageTypeHeaderPresent(); if (_shouldFailWhenUnrecognizedMessageType) { @@ -198,7 +198,8 @@ protected IEnumerable TryMatchConsumerInvok { if (_shouldLogWhenUnrecognizedMessageType) { - _logger.LogInformation("The message on path {Path} declared {HeaderName} header of type {MessageType}, but none of the known consumer types {ConsumerTypes} was able to handle it", Path, MessageHeaders.MessageType, messageType, string.Join(",", _invokers.Select(x => x.ConsumerType.Name))); + var consumerTypes = string.Join(",", _invokers.Select(x => x.ConsumerType.Name)); + LogNoConsumerTypeMatched(messageType, Path, MessageHeaders.MessageType, consumerTypes); } if (_shouldFailWhenUnrecognizedMessageType) @@ -208,4 +209,69 @@ protected IEnumerable TryMatchConsumerInvok } } } -} \ No newline at end of file + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Processing of the message {TransportMessage} of type {MessageType} failed")] + private partial void LogProcessingMessageFailedTypeKnown(TTransportMessage transportMessage, Type messageType, Exception e); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "Processing of the message {TransportMessage} failed")] + private partial void LogProcessingMessageFailed(TTransportMessage transportMessage, Exception e); + + [LoggerMessage( + EventId = 2, + Level = LogLevel.Debug, + Message = "Message type {MessageType} was declared in the message header")] + private partial void LogMessageTypeDeclaredInHeader(Type messageType); + + [LoggerMessage( + EventId = 3, + Level = LogLevel.Debug, + Message = "No message type header was present, defaulting to the only declared message type {MessageType}")] + private partial void LogMessageTypeHeaderMissingAndDefaulting(Type messageType); + + [LoggerMessage( + EventId = 4, + Level = LogLevel.Debug, + Message = "No message type header was present in the message header, multiple consumer types declared therefore cannot infer the message type")] + private partial void LogNoMessageTypeHeaderPresent(); + + [LoggerMessage( + EventId = 5, + Level = LogLevel.Information, + Message = "The message on path {Path} declared {HeaderName} header of type {MessageType}, but none of the known consumer types {ConsumerTypes} was able to handle it")] + private partial void LogNoConsumerTypeMatched(Type messageType, string path, string headerName, string consumerTypes); + + #endregion +} + +#if NETSTANDARD2_0 + +public partial class MessageProcessor +{ + private partial void LogProcessingMessageFailedTypeKnown(TTransportMessage transportMessage, Type messageType, Exception e) + => _logger.LogDebug(e, "Processing of the message {TransportMessage} of type {MessageType} failed", transportMessage, messageType); + + private partial void LogProcessingMessageFailed(TTransportMessage transportMessage, Exception e) + => _logger.LogDebug(e, "Processing of the message {TransportMessage} failed", transportMessage); + + private partial void LogMessageTypeDeclaredInHeader(Type messageType) + => _logger.LogDebug("Message type {MessageType} was declared in the message header", messageType); + + private partial void LogMessageTypeHeaderMissingAndDefaulting(Type messageType) + => _logger.LogDebug("No message type header was present, defaulting to the only declared message type {MessageType}", messageType); + + private partial void LogNoMessageTypeHeaderPresent() + => _logger.LogDebug("No message type header was present in the message header, multiple consumer types declared therefore cannot infer the message type"); + + private partial void LogNoConsumerTypeMatched(Type messageType, string path, string headerName, string consumerTypes) + => _logger.LogInformation("The message on path {Path} declared {HeaderName} header of type {MessageType}, but none of the known consumer types {ConsumerTypes} was able to handle it", path, headerName, messageType, consumerTypes); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs index 2290b8f8..b1f02b40 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs @@ -6,7 +6,7 @@ public abstract class ResponseMessageProcessor; /// The implementation that processes the responses arriving to the bus. /// /// -public class ResponseMessageProcessor : ResponseMessageProcessor, IMessageProcessor +public partial class ResponseMessageProcessor : ResponseMessageProcessor, IMessageProcessor { private readonly ILogger _logger; private readonly RequestResponseSettings _requestResponseSettings; @@ -42,7 +42,7 @@ public Task ProcessMessage(TTransportMessage transportMess } catch (Exception e) { - _logger.LogError(e, "Error occurred while consuming response message, {Message}", transportMessage); + LogErrorConsumingResponse(transportMessage, e); // We can only continue and process all messages in the lease ex = e; } @@ -68,7 +68,7 @@ private Exception OnResponseArrived(TTransportMessage transportMessage, string p var requestState = _pendingRequestStore.GetById(requestId); if (requestState == null) { - _logger.LogDebug("The response message for request id {RequestId} arriving on path {Path} will be disregarded. Either the request had already expired, had been cancelled or it was already handled (this response message is a duplicate).", requestId, path); + LogResponseWillBeDiscarded(path, requestId); // ToDo: add and API hook to these kind of situation return null; } @@ -77,8 +77,8 @@ private Exception OnResponseArrived(TTransportMessage transportMessage, string p { if (_logger.IsEnabled(LogLevel.Debug)) { - var tookTimespan = _currentTimeProvider.CurrentTime.Subtract(requestState.Created); - _logger.LogDebug("Response arrived for {Request} on path {Path} (time: {RequestTime} ms)", requestState, path, tookTimespan); + var requestTime = _currentTimeProvider.CurrentTime.Subtract(requestState.Created); + LogResponseArrived(path, requestState, requestTime); } if (responseHeaders.TryGetHeader(ReqRespMessageHeaders.Error, out string errorMessage)) @@ -86,7 +86,7 @@ private Exception OnResponseArrived(TTransportMessage transportMessage, string p // error response arrived var responseException = new RequestHandlerFaultedMessageBusException(errorMessage); - _logger.LogDebug(responseException, "Response arrived for {Request} on path {Path} with error: {ResponseError}", requestState, path, responseException.Message); + LogResponseArrivedWithError(path, requestState, responseException, responseException.Message); requestState.TaskCompletionSource.TrySetException(responseException); } else @@ -104,7 +104,7 @@ private Exception OnResponseArrived(TTransportMessage transportMessage, string p } catch (Exception e) { - _logger.LogDebug(e, "Could not deserialize the response message for {Request} arriving on path {Path}", requestState, path); + LogResponseCouldNotDeserialize(path, requestState, e); requestState.TaskCompletionSource.TrySetException(e); } } @@ -117,4 +117,60 @@ private Exception OnResponseArrived(TTransportMessage transportMessage, string p return null; } + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Error, + Message = "Error occurred while consuming response message, {Message}")] + private partial void LogErrorConsumingResponse(TTransportMessage message, Exception e); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "The response message for request id {RequestId} arriving on path {Path} will be disregarded. Either the request had already expired, had been cancelled or it was already handled (this response message is a duplicate).")] + private partial void LogResponseWillBeDiscarded(string path, string requestId); + + [LoggerMessage( + EventId = 2, + Level = LogLevel.Debug, + Message = "Response arrived for {RequestState} on path {Path} (time: {RequestTime} ms)")] + private partial void LogResponseArrived(string path, PendingRequestState requestState, TimeSpan requestTime); + + [LoggerMessage( + EventId = 3, + Level = LogLevel.Debug, + Message = "Response arrived for {RequestState} on path {Path} with error: {ResponseError}")] + private partial void LogResponseArrivedWithError(string path, PendingRequestState requestState, Exception e, string responseError); + + [LoggerMessage( + EventId = 4, + Level = LogLevel.Debug, + Message = "Could not deserialize the response message for {RequestState} arriving on path {Path}")] + private partial void LogResponseCouldNotDeserialize(string path, PendingRequestState requestState, Exception e); + + #endregion +} + +#if NETSTANDARD2_0 + +public partial class ResponseMessageProcessor +{ + private partial void LogErrorConsumingResponse(TTransportMessage message, Exception e) + => _logger.LogError(e, "Error occurred while consuming response message, {Message}", message); + + private partial void LogResponseWillBeDiscarded(string path, string requestId) + => _logger.LogDebug("The response message for request id {RequestId} arriving on path {Path} will be disregarded. Either the request had already expired, had been cancelled or it was already handled (this response message is a duplicate).", requestId, path); + + private partial void LogResponseArrived(string path, PendingRequestState requestState, TimeSpan requestTime) + => _logger.LogDebug("Response arrived for {RequestState} on path {Path} (time: {RequestTime} ms)", requestState, path, requestTime); + + private partial void LogResponseArrivedWithError(string path, PendingRequestState requestState, Exception e, string responseError) + => _logger.LogDebug(e, "Response arrived for {RequestState} on path {Path} with error: {ResponseError}", requestState, path, responseError); + + private partial void LogResponseCouldNotDeserialize(string path, PendingRequestState requestState, Exception e) + => _logger.LogDebug(e, "Could not deserialize the response message for {RequestState} arriving on path {Path}", requestState, path); } + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Helpers/CompatAttributes.cs b/src/SlimMessageBus.Host/Helpers/CompatAttributes.cs new file mode 100644 index 00000000..3e322b75 --- /dev/null +++ b/src/SlimMessageBus.Host/Helpers/CompatAttributes.cs @@ -0,0 +1,45 @@ +#if NETSTANDARD2_0 + +namespace SlimMessageBus.Host; + +[AttributeUsage(AttributeTargets.Method)] +public class LoggerMessageAttribute : Attribute +{ + /// + /// Gets the logging event id for the logging method. + /// + public int EventId { get; set; } = -1; + + /// + /// Gets or sets the logging event name for the logging method. + /// + /// + /// This will equal the method name if not specified. + /// + public string EventName { get; set; } + + /// + /// Gets the logging level for the logging method. + /// + public LogLevel Level { get; set; } = LogLevel.None; + + /// + /// Gets the message text for the logging method. + /// + public string Message { get; set; } = ""; + + /// + /// Gets the flag to skip IsEnabled check for the logging method. + /// + public bool SkipEnabledCheck { get; set; } + + public LoggerMessageAttribute() + { + } + + public LoggerMessageAttribute(int eventId, LogLevel level, string message) + { + } +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Helpers/CompatMethods.cs b/src/SlimMessageBus.Host/Helpers/CompatMethods.cs index 145d8865..d02ebaa3 100644 --- a/src/SlimMessageBus.Host/Helpers/CompatMethods.cs +++ b/src/SlimMessageBus.Host/Helpers/CompatMethods.cs @@ -25,8 +25,6 @@ public static bool TryAdd(this IDictionary dict, K key, V value) public static HashSet ToHashSet(this IEnumerable items) => new(items); -#if NETSTANDARD2_0 - public static IEnumerable> Chunk(this IEnumerable items, int size) { var chunk = new List(size); @@ -50,7 +48,6 @@ public static IEnumerable> Chunk(this IEnumerable i } } -#endif } public static class TimeSpanExtensions @@ -59,4 +56,4 @@ public static TimeSpan Multiply(this TimeSpan timeSpan, double factor) => TimeSpan.FromMilliseconds(timeSpan.TotalMilliseconds * factor); } -#endif +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Helpers/CompatRecord.cs b/src/SlimMessageBus.Host/Helpers/CompatRecord.cs index 56a4fc7d..118d95fe 100644 --- a/src/SlimMessageBus.Host/Helpers/CompatRecord.cs +++ b/src/SlimMessageBus.Host/Helpers/CompatRecord.cs @@ -1,4 +1,4 @@ -#if NETSTANDARD2_0 || NETSTANDARD2_1 || NETCOREAPP2_0 || NETCOREAPP2_1 || NETCOREAPP2_2 || NETCOREAPP3_0 || NETCOREAPP3_1 || NET45 || NET451 || NET452 || NET46 || NET461 || NET462 || NET47 || NET471 || NET472 || NET48 +#if NETSTANDARD2_0 // See https://github.com/dotnet/roslyn/issues/45510#issuecomment-725091019 diff --git a/src/SlimMessageBus.Host/Retry.cs b/src/SlimMessageBus.Host/Helpers/Retry.cs similarity index 88% rename from src/SlimMessageBus.Host/Retry.cs rename to src/SlimMessageBus.Host/Helpers/Retry.cs index 5c58579d..b56cbdc7 100644 --- a/src/SlimMessageBus.Host/Retry.cs +++ b/src/SlimMessageBus.Host/Helpers/Retry.cs @@ -6,17 +6,8 @@ public static class Retry public static async Task WithDelay(Func operation, Func shouldRetry, TimeSpan? delay, TimeSpan? jitter = default, CancellationToken cancellationToken = default) { -#if NETSTANDARD2_0 if (operation is null) throw new ArgumentNullException(nameof(operation)); -#else - ArgumentNullException.ThrowIfNull(operation); -#endif - -#if NETSTANDARD2_0 if (shouldRetry is null) throw new ArgumentNullException(nameof(shouldRetry)); -#else - ArgumentNullException.ThrowIfNull(shouldRetry); -#endif var attempt = 0; do diff --git a/src/SlimMessageBus.Host/Helpers/Utils.cs b/src/SlimMessageBus.Host/Helpers/Utils.cs index 8d1cd9a1..8d4c544d 100644 --- a/src/SlimMessageBus.Host/Helpers/Utils.cs +++ b/src/SlimMessageBus.Host/Helpers/Utils.cs @@ -30,22 +30,18 @@ public static async ValueTask DisposeSilently(this IAsyncDisposable disposable, } public static void DisposeSilently(this IDisposable disposable, string name, ILogger logger) - { - disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", name)); - } - - public static void DisposeSilently(this IDisposable disposable, Func nameFunc, ILogger logger) - { - disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", nameFunc())); - } + => disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", name)); public static ValueTask DisposeSilently(this IAsyncDisposable disposable, Func nameFunc, ILogger logger) - { - return disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", nameFunc())); - } + => disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", nameFunc())); public static ValueTask DisposeSilently(this IAsyncDisposable disposable, string name, ILogger logger) + => disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", name)); + + public static string JoinOrSingle(this T[] values, Func selector, string separator = ",") => values.Length switch { - return disposable.DisposeSilently(e => logger.LogWarning(e, "Error occurred while disposing {Name}", name)); - } + 0 => string.Empty, + 1 => selector(values[0]), + _ => string.Join(separator, values.Select(selector)) + }; } \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs b/src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs index 0cffd609..66a47ba9 100644 --- a/src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs +++ b/src/SlimMessageBus.Host/Hybrid/HybridMessageBus.cs @@ -4,7 +4,7 @@ using SlimMessageBus.Host.Serialization; -public class HybridMessageBus : IMasterMessageBus, ICompositeMessageBus, IDisposable, IAsyncDisposable +public partial class HybridMessageBus : IMasterMessageBus, ICompositeMessageBus, IDisposable, IAsyncDisposable { private readonly ILogger _logger; private readonly Dictionary _busByName; @@ -121,7 +121,8 @@ protected virtual MessageBusBase[] Route(object message, string path) { if (_logger.IsEnabled(LogLevel.Debug)) { - _logger.LogDebug("Resolved bus {BusName} for message type: {MessageType} and path {Path}", string.Join(",", buses.Select(x => x.Settings.Name)), messageType, path); + var busName = buses.JoinOrSingle(x => x.Settings.Name); + LogResolvedBus(path, messageType, busName); } return buses; } @@ -134,7 +135,7 @@ protected virtual MessageBusBase[] Route(object message, string path) // Add the message type, so that we only emit warn log once if (ProviderSettings.UndeclaredMessageTypeMode == UndeclaredMessageTypeMode.RaiseOneTimeLog && _undeclaredMessageType.TryAdd(messageType, true)) { - _logger.LogInformation("Could not find any bus that produces the message type: {MessageType}. Messages of that type will not be delivered to any child bus. Double check the message bus configuration.", messageType); + LogCouldNotFindBus(messageType); } return []; @@ -198,4 +199,33 @@ public IMasterMessageBus GetChildBus(string name) public IEnumerable GetChildBuses() => _busByName.Values; #endregion + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Resolved bus {BusName} for message type {MessageType} and path {Path}")] + private partial void LogResolvedBus(string path, Type messageType, string busName); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Information, + Message = "Could not find any bus that produces the message type {MessageType}. Messages of that type will not be delivered to any child bus. Double check the message bus configuration.")] + private partial void LogCouldNotFindBus(Type messageType); + + #endregion } + +#if NETSTANDARD2_0 + +public partial class HybridMessageBus +{ + private partial void LogResolvedBus(string path, Type messageType, string busName) + => _logger.LogDebug("Resolved bus {BusName} for message type {MessageType} and path {Path}", busName, messageType, path); + + private partial void LogCouldNotFindBus(Type messageType) + => _logger.LogInformation("Could not find any bus that produces the message type {MessageType}. Messages of that type will not be delivered to any child bus. Double check the message bus configuration.", messageType); +} + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs index 02384e3a..827bf499 100644 --- a/src/SlimMessageBus.Host/MessageBusBase.cs +++ b/src/SlimMessageBus.Host/MessageBusBase.cs @@ -12,7 +12,7 @@ public abstract class MessageBusBase(MessageBusSettings setti public TProviderSettings ProviderSettings { get; } = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings)); } -public abstract class MessageBusBase : IDisposable, IAsyncDisposable, +public abstract partial class MessageBusBase : IDisposable, IAsyncDisposable, IMasterMessageBus, IMessageScopeFactory, IMessageHeadersFactory, @@ -111,7 +111,7 @@ protected MessageBusBase(MessageBusSettings settings) protected virtual IMessageSerializer GetSerializer() => Settings.GetSerializer(Settings.ServiceProvider); - protected virtual IMessageBusSettingsValidationService ValidationService { get => new DefaultMessageBusSettingsValidationService(Settings); } + protected virtual IMessageBusSettingsValidationService ValidationService => new DefaultMessageBusSettingsValidationService(Settings); /// /// Called by the provider to initialize the bus. @@ -137,7 +137,7 @@ protected void OnBuildProvider() } catch (Exception e) { - _logger.LogError(e, "Could not auto start consumers"); + LogCouldNotStartConsumers(e); } }); } @@ -191,17 +191,16 @@ public async Task Start() try { - await InitTaskList.EnsureAllFinished(); - - _logger.LogInformation("Starting consumers for {BusName} bus...", Name); + await InitTaskList.EnsureAllFinished().ConfigureAwait(false); + LogStartingConsumers(Name); await OnBusLifecycle(MessageBusLifecycleEventType.Starting).ConfigureAwait(false); - await CreateConsumers(); + await CreateConsumers().ConfigureAwait(false); await OnStart().ConfigureAwait(false); await Task.WhenAll(_consumers.Select(x => x.Start())).ConfigureAwait(false); await OnBusLifecycle(MessageBusLifecycleEventType.Started).ConfigureAwait(false); - _logger.LogInformation("Started consumers for {BusName} bus", Name); + LogStartedConsumers(Name); lock (_startLock) { @@ -230,9 +229,9 @@ public async Task Stop() try { - await InitTaskList.EnsureAllFinished(); + await InitTaskList.EnsureAllFinished().ConfigureAwait(false); - _logger.LogInformation("Stopping consumers for {BusName} bus...", Name); + LogStoppingConsumers(Name); await OnBusLifecycle(MessageBusLifecycleEventType.Stopping).ConfigureAwait(false); await Task.WhenAll(_consumers.Select(x => x.Stop())).ConfigureAwait(false); @@ -240,7 +239,7 @@ public async Task Stop() await DestroyConsumers().ConfigureAwait(false); await OnBusLifecycle(MessageBusLifecycleEventType.Stopped).ConfigureAwait(false); - _logger.LogInformation("Stopped consumers for {BusName} bus", Name); + LogStoppedConsumers(Name); lock (_startLock) { @@ -332,13 +331,13 @@ protected async virtual ValueTask DisposeAsyncCore() protected virtual Task CreateConsumers() { - _logger.LogInformation("Creating consumers for {BusName} bus...", Name); + LogCreatingConsumers(Name); return Task.CompletedTask; } protected async virtual Task DestroyConsumers() { - _logger.LogInformation("Destroying consumers for {BusName} bus...", Name); + LogDestroyingConsumers(Name); foreach (var consumer in _consumers) { @@ -370,7 +369,7 @@ protected virtual string GetDefaultPath(Type messageType, ProducerSettings produ var path = producerSettings.DefaultPath ?? throw new ProducerMessageBusException($"An attempt to produce message of type {messageType} without specifying path, but there was no default path configured. Double check your configuration."); - _logger.LogDebug("Applying default path {Path} for message type {MessageType}", path, messageType); + LogApplyingDefaultPath(messageType, path); return path; } @@ -383,10 +382,10 @@ public abstract Task ProduceToTransport( CancellationToken cancellationToken); protected void OnProduceToTransport(object message, - Type messageType, - string path, - IDictionary messageHeaders) - => _logger.LogDebug("Producing message {Message} of type {MessageType} to path {Path}", message, messageType, path); + Type messageType, + string path, + IDictionary messageHeaders) + => LogProducingMessageToPath(message, messageType, path); public virtual int? MaxMessagesPerTransaction => null; @@ -509,7 +508,7 @@ protected virtual TimeSpan GetDefaultRequestTimeout(Type requestType, ProducerSe if (producerSettings == null) throw new ArgumentNullException(nameof(producerSettings)); var timeout = producerSettings.Timeout ?? Settings.RequestResponse.Timeout; - _logger.LogDebug("Applying default timeout {MessageTimeout} for message type {MessageType}", timeout, requestType); + LogApplyingDefaultTimeout(requestType, timeout); return timeout; } @@ -584,12 +583,12 @@ protected async internal virtual Task SendInternal SendInternal consumerContextProperties, IServiceProvider currentServiceProvider) { var createMessageScope = IsMessageScopeEnabled(consumerSettings, consumerContextProperties); - if (createMessageScope) { - _logger.LogDebug("Creating message scope for {Message} of type {MessageType}", message, message.GetType()); + LogCreatingScope(message, message.GetType()); } return new MessageScopeWrapper(currentServiceProvider ?? Settings.ServiceProvider, createMessageScope); } public virtual Task ProvisionTopology() => Task.CompletedTask; + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Error, + Message = "Could not auto start consumers")] + private partial void LogCouldNotStartConsumers(Exception ex); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "Creating message scope for {Message} of type {MessageType}")] + private partial void LogCreatingScope(object message, Type messageType); + + [LoggerMessage( + EventId = 2, + Level = LogLevel.Debug, + Message = "Publishing of request message failed")] + private partial void LogPublishOfRequestFailed(Exception ex); + + [LoggerMessage( + EventId = 3, + Level = LogLevel.Information, + Message = "Starting consumers for {BusName} bus...")] + private partial void LogStartingConsumers(string busName); + + [LoggerMessage( + EventId = 4, + Level = LogLevel.Information, + Message = "Started consumers for {BusName} bus")] + private partial void LogStartedConsumers(string busName); + + [LoggerMessage( + EventId = 5, + Level = LogLevel.Information, + Message = "Stopping consumers for {BusName} bus...")] + private partial void LogStoppingConsumers(string busName); + + [LoggerMessage( + EventId = 6, + Level = LogLevel.Information, + Message = "Stopped consumers for {BusName} bus")] + private partial void LogStoppedConsumers(string busName); + + [LoggerMessage( + EventId = 7, + Level = LogLevel.Information, + Message = "Creating consumers for {BusName} bus...")] + private partial void LogCreatingConsumers(string busName); + + [LoggerMessage( + EventId = 8, + Level = LogLevel.Information, + Message = "Destroying consumers for {BusName} bus...")] + private partial void LogDestroyingConsumers(string busName); + + [LoggerMessage( + EventId = 9, + Level = LogLevel.Debug, + Message = "Applying default path {Path} for message type {MessageType}")] + private partial void LogApplyingDefaultPath(Type messageType, string path); + + [LoggerMessage( + EventId = 10, + Level = LogLevel.Debug, + Message = "Applying default timeout {MessageTimeout} for message type {MessageType}")] + private partial void LogApplyingDefaultTimeout(Type messageType, TimeSpan messageTimeout); + + [LoggerMessage( + EventId = 11, + Level = LogLevel.Debug, + Message = "Producing message {Message} of type {MessageType} to path {Path}")] + private partial void LogProducingMessageToPath(object message, Type messageType, string path); + + [LoggerMessage( + EventId = 12, + Level = LogLevel.Trace, + Message = "Added to PendingRequests, total is {RequestCount}")] + private partial void LogAddedToPendingRequests(int requestCount); + + [LoggerMessage( + EventId = 13, + Level = LogLevel.Debug, + Message = "Sending request message {MessageType} to path {Path} with reply to {ReplyTo}")] + private partial void LogSendingRequestMessage(string path, Type messageType, string replyTo); + + [LoggerMessage( + EventId = 14, + Level = LogLevel.Debug, + Message = "Skipping sending response {Response} of type {MessageType} as the header {HeaderName} is missing for RequestId: {RequestId}")] + private partial void LogSkippingSendingResponseMessage(string requestId, object response, Type messageType, string headerName); + + [LoggerMessage( + EventId = 15, + Level = LogLevel.Debug, + Message = "Sending the response {Response} of type {MessageType} for RequestId: {RequestId}...")] + private partial void LogSendingResponseMessage(string requestId, object response, Type messageType); + + #endregion +} + +#if NETSTANDARD2_0 +public abstract partial class MessageBusBase +{ + private partial void LogCouldNotStartConsumers(Exception ex) + => _logger.LogError(ex, "Could not auto start consumers"); + + private partial void LogCreatingScope(object message, Type messageType) + => _logger.LogDebug("Creating message scope for {Message} of type {MessageType}", message, messageType); + + private partial void LogPublishOfRequestFailed(Exception ex) + => _logger.LogDebug(ex, "Publishing of request message failed"); + + private partial void LogStartingConsumers(string busName) + => _logger.LogInformation("Starting consumers for {BusName} bus...", busName); + + private partial void LogStartedConsumers(string busName) + => _logger.LogInformation("Started consumers for {BusName} bus", busName); + + private partial void LogStoppingConsumers(string busName) + => _logger.LogInformation("Stopping consumers for {BusName} bus...", busName); + + private partial void LogStoppedConsumers(string busName) + => _logger.LogInformation("Stopped consumers for {BusName} bus", busName); + + private partial void LogCreatingConsumers(string busName) + => _logger.LogInformation("Creating consumers for {BusName} bus...", busName); + + private partial void LogDestroyingConsumers(string busName) + => _logger.LogInformation("Destroying consumers for {BusName} bus...", busName); + + private partial void LogApplyingDefaultPath(Type messageType, string path) + => _logger.LogDebug("Applying default path {Path} for message type {MessageType}", path, messageType); + + private partial void LogApplyingDefaultTimeout(Type messageType, TimeSpan messageTimeout) + => _logger.LogDebug("Applying default timeout {MessageTimeout} for message type {MessageType}", messageTimeout, messageType); + + private partial void LogProducingMessageToPath(object message, Type messageType, string path) + => _logger.LogDebug("Producing message {Message} of type {MessageType} to path {Path}", message, messageType, path); + + private partial void LogAddedToPendingRequests(int requestCount) + => _logger.LogTrace("Added to PendingRequests, total is {RequestCount}", requestCount); + + private partial void LogSendingRequestMessage(string path, Type messageType, string replyTo) + => _logger.LogDebug("Sending request message {MessageType} to path {Path} with reply to {ReplyTo}", messageType, path, replyTo); + + private partial void LogSkippingSendingResponseMessage(string requestId, object response, Type messageType, string headerName) + => _logger.LogDebug("Skipping sending response {Response} of type {MessageType} as the header {HeaderName} is missing for RequestId: {RequestId}", response, messageType, headerName, requestId); + + private partial void LogSendingResponseMessage(string requestId, object response, Type messageType) + => _logger.LogDebug("Sending the response {Response} of type {MessageType} for RequestId: {RequestId}...", response, messageType, requestId); } + +#endif \ No newline at end of file diff --git a/src/SlimMessageBus.Host/RequestResponse/PendingRequestManager.cs b/src/SlimMessageBus.Host/RequestResponse/PendingRequestManager.cs index a4731927..f37f0dcb 100644 --- a/src/SlimMessageBus.Host/RequestResponse/PendingRequestManager.cs +++ b/src/SlimMessageBus.Host/RequestResponse/PendingRequestManager.cs @@ -3,7 +3,7 @@ /// /// Manages the pending requests - ensure requests which exceeded the allotted timeout period are removed. /// -public class PendingRequestManager : IPendingRequestManager, IDisposable +public partial class PendingRequestManager : IPendingRequestManager, IDisposable { private readonly ILogger _logger; @@ -85,10 +85,31 @@ public virtual void CleanPendingRequests() if (canceled) { - _logger.LogDebug("Pending request timed-out: {RequestState}, now: {TimeNow}", requestState, now); + LogPendingRequestTimeout(now, requestState); _onRequestTimeout?.Invoke(requestState.Request); } } Store.RemoveAll(requestsToCancel.Select(x => x.Id)); - } + } + + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Debug, + Message = "Pending request timed-out: {RequestState}, now: {TimeNow}")] + private partial void LogPendingRequestTimeout(DateTimeOffset timeNow, PendingRequestState requestState); + + #endregion } + +#if NETSTANDARD2_0 + +public partial class PendingRequestManager +{ + private partial void LogPendingRequestTimeout(DateTimeOffset timeNow, PendingRequestState requestState) + => _logger.LogDebug("Pending request timed-out: {RequestState}, now: {TimeNow}", requestState, timeNow); +} + +#endif diff --git a/src/SlimMessageBus.Host/RequestResponse/PendingRequestState.cs b/src/SlimMessageBus.Host/RequestResponse/PendingRequestState.cs index 96f92fcf..73a0ef56 100644 --- a/src/SlimMessageBus.Host/RequestResponse/PendingRequestState.cs +++ b/src/SlimMessageBus.Host/RequestResponse/PendingRequestState.cs @@ -24,12 +24,5 @@ public PendingRequestState(string id, object request, Type requestType, Type res CancellationToken = cancellationToken; } - #region Overrides of Object - - public override string ToString() - { - return $"Request(Id: {Id}, RequestType: {RequestType}, ResponseType: {ResponseType}, Created: {Created}, Expires: {Expires})"; - } - - #endregion + public override string ToString() => $"Request(Id: {Id}, RequestType: {RequestType}, ResponseType: {ResponseType}, Created: {Created}, Expires: {Expires})"; } \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Services/MessageHeaderService.cs b/src/SlimMessageBus.Host/Services/MessageHeaderService.cs index f6877245..1357c55a 100644 --- a/src/SlimMessageBus.Host/Services/MessageHeaderService.cs +++ b/src/SlimMessageBus.Host/Services/MessageHeaderService.cs @@ -6,7 +6,7 @@ internal interface IMessageHeaderService void AddMessageTypeHeader(object message, IDictionary headers); } -internal class MessageHeaderService : IMessageHeaderService +internal partial class MessageHeaderService : IMessageHeaderService { private readonly ILogger _logger; private readonly MessageBusSettings _settings; @@ -35,14 +35,14 @@ public void AddMessageHeaders(IDictionary messageHeaders, IDicti if (producerSettings.HeaderModifier != null) { // Call header hook - _logger.LogTrace($"Executing producer {nameof(ProducerSettings.HeaderModifier)}"); + LogExecutingHeaderModifier("producer"); producerSettings.HeaderModifier(messageHeaders, message); } if (_settings.HeaderModifier != null) { // Call header hook - _logger.LogTrace($"Executing bus {nameof(MessageBusSettings.HeaderModifier)}"); + LogExecutingHeaderModifier("bus"); _settings.HeaderModifier(messageHeaders, message); } } @@ -54,5 +54,24 @@ public void AddMessageTypeHeader(object message, IDictionary hea headers.SetHeader(MessageHeaders.MessageType, _messageTypeResolver.ToName(message.GetType())); } } + + #region Logging + + [LoggerMessage( + EventId = 0, + Level = LogLevel.Trace, + Message = $"Executing {{ConfigLevel}} {nameof(ProducerSettings.HeaderModifier)}")] + private partial void LogExecutingHeaderModifier(string configLevel); + + #endregion +} + +#if NETSTANDARD2_0 + +internal partial class MessageHeaderService +{ + private partial void LogExecutingHeaderModifier(string configLevel) + => _logger.LogTrace($"Executing {{ConfigLevel}} {nameof(ProducerSettings.HeaderModifier)}", configLevel); } +#endif \ No newline at end of file diff --git a/src/SlimMessageBus/SlimMessageBus.csproj b/src/SlimMessageBus/SlimMessageBus.csproj index 569adb31..e6d6296f 100644 --- a/src/SlimMessageBus/SlimMessageBus.csproj +++ b/src/SlimMessageBus/SlimMessageBus.csproj @@ -3,7 +3,7 @@ - 3.0.0-rc902 + 3.0.0-rc904 This library provides a lightweight, easy-to-use message bus interface for .NET, offering a simplified facade for working with messaging brokers. It supports multiple transport providers for popular messaging systems, as well as in-memory (in-process) messaging for efficient local communication. diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs index 8003c373..74aa2b9c 100644 --- a/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs +++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs @@ -59,7 +59,7 @@ public CircuitBreakerAbstractConsumerInterceptorTests() { accessor = new CircuitBreakerAccessor(); - var h = new CircuitBreakerAbstractConsumerInterceptor(); + var h = new CircuitBreakerAbstractConsumerInterceptor(NullLogger.Instance); var serviceCollection = new ServiceCollection(); serviceCollection.TryAddSingleton(accessor); diff --git a/src/Tests/SlimMessageBus.Host.Memory.Benchmark/AbstractMemoryBenchmark.cs b/src/Tests/SlimMessageBus.Host.Memory.Benchmark/AbstractMemoryBenchmark.cs index a60f2d16..505ea7ee 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Benchmark/AbstractMemoryBenchmark.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Benchmark/AbstractMemoryBenchmark.cs @@ -1,29 +1,41 @@ namespace SlimMessageBus.Host.Memory.Benchmark; +using System.Reflection; + using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using SlimMessageBus.Host; -using System.Reflection; - public abstract class AbstractMemoryBenchmark : IDisposable { - protected ServiceProvider svp; - protected readonly IMessageBus bus; + private Lazy _serviceProvider; + + protected IServiceProvider ServiceProvider => _serviceProvider.Value; + + protected bool PerMessageScopeEnabled { get; set; } + + protected IMessageBus Bus => ServiceProvider.GetRequiredService(); protected AbstractMemoryBenchmark() { - var services = new ServiceCollection(); - - services.AddSlimMessageBus(mbb => mbb.WithProviderMemory().AutoDeclareFrom(Assembly.GetExecutingAssembly())); + _serviceProvider = new Lazy(() => + { + var services = new ServiceCollection(); - services.AddSingleton(); - services.AddTransient(); - Setup(services); + services.AddSlimMessageBus(mbb => mbb + .WithProviderMemory() + .AutoDeclareFrom(Assembly.GetExecutingAssembly()) + .PerMessageScopeEnabled(PerMessageScopeEnabled)); - svp = services.BuildServiceProvider(); + services.AddSingleton(NullLoggerFactory.Instance); + services.AddSingleton(); + services.AddTransient(); + Setup(services); - bus = svp.GetRequiredService(); + return services.BuildServiceProvider(); + }); } protected virtual void Setup(ServiceCollection services) @@ -32,10 +44,10 @@ protected virtual void Setup(ServiceCollection services) public void Dispose() { - if (svp != null) + if (_serviceProvider.Value != null) { - svp.Dispose(); - svp = null; + _serviceProvider.Value.Dispose(); + _serviceProvider = null; } } } diff --git a/src/Tests/SlimMessageBus.Host.Memory.Benchmark/PubSubBenchmark.cs b/src/Tests/SlimMessageBus.Host.Memory.Benchmark/PubSubBenchmark.cs index d9ae5834..b3b622e3 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Benchmark/PubSubBenchmark.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Benchmark/PubSubBenchmark.cs @@ -8,25 +8,21 @@ public abstract class PubSubBaseBenchmark : AbstractMemoryBenchmark { - private readonly TestResult testResult; - - public PubSubBaseBenchmark() - { - testResult = svp.GetRequiredService(); - } - protected override void Setup(ServiceCollection services) { services.AddSingleton(); services.AddTransient(); } - protected async Task RunTest(int messageCount) + protected async Task RunTest(int messageCount, bool createMessageScope) { + PerMessageScopeEnabled = createMessageScope; + var bus = Bus; var publishTasks = Enumerable.Range(0, messageCount).Select(x => bus.Publish(new SomeEvent(DateTimeOffset.Now, x))); await Task.WhenAll(publishTasks); + var testResult = ServiceProvider.GetRequiredService(); while (testResult.ArrivedCount < messageCount) { await Task.Yield(); @@ -38,8 +34,9 @@ protected async Task RunTest(int messageCount) public class PubSubBenchmark : PubSubBaseBenchmark { [Benchmark] - [Arguments(1000000)] - public Task PubSub(int messageCount) => RunTest(messageCount); + [Arguments(1000000, true)] + [Arguments(1000000, false)] + public Task PubSub(int messageCount, bool createMessageScope) => RunTest(messageCount, createMessageScope); } [MemoryDiagnoser] @@ -53,8 +50,9 @@ protected override void Setup(ServiceCollection services) } [Benchmark] - [Arguments(1000000)] - public Task PubSubWithProducerInterceptor(int messageCount) => RunTest(messageCount); + [Arguments(1000000, true)] + [Arguments(1000000, false)] + public Task PubSubWithProducerInterceptor(int messageCount, bool createMessageScope) => RunTest(messageCount, createMessageScope); } [MemoryDiagnoser] @@ -68,8 +66,9 @@ protected override void Setup(ServiceCollection services) } [Benchmark] - [Arguments(1000000)] - public Task PubSubWithPublishInterceptor(int messageCount) => RunTest(messageCount); + [Arguments(1000000, true)] + [Arguments(1000000, false)] + public Task PubSubWithPublishInterceptor(int messageCount, bool createMessageScope) => RunTest(messageCount, createMessageScope); } [MemoryDiagnoser] @@ -83,8 +82,9 @@ protected override void Setup(ServiceCollection services) } [Benchmark] - [Arguments(1000000)] - public Task PubSubWithConsumerInterceptor(int messageCount) => RunTest(messageCount); + [Arguments(1000000, true)] + [Arguments(1000000, false)] + public Task PubSubWithConsumerInterceptor(int messageCount, bool createMessageScope) => RunTest(messageCount, createMessageScope); } public record SomeEvent(DateTimeOffset Timestamp, long Id); diff --git a/src/Tests/SlimMessageBus.Host.Memory.Benchmark/README.md b/src/Tests/SlimMessageBus.Host.Memory.Benchmark/README.md new file mode 100644 index 00000000..ddee27b3 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Memory.Benchmark/README.md @@ -0,0 +1,58 @@ +Sample Benchmark results + +``` +// * Summary * + +BenchmarkDotNet v0.14.0, Windows 11 (10.0.26100.2605) +12th Gen Intel Core i7-1260P, 1 CPU, 16 logical and 12 physical cores +.NET SDK 9.0.100 + [Host] : .NET 8.0.11 (8.0.1124.51707), X64 RyuJIT AVX2 + Job-SXUBYX : .NET 8.0.11 (8.0.1124.51707), X64 RyuJIT AVX2 + +MaxIterationCount=30 MaxWarmupIterationCount=10 + +``` + +| Type | Method | messageCount | createMessageScope | Mean | Error | StdDev | Gen0 | Gen1 | Gen2 | Allocated | +|--------------------------------------- |------------------------------ |------------- |------------------- |---------:|---------:|---------:|------------:|----------:|----------:|----------:| +| PubSubBenchmark | PubSub | 1000000 | False | 651.3 ms | 11.20 ms | 11.51 ms | 116000.0000 | 3000.0000 | 3000.0000 | 1.02 GB | +| PubSubWithConsumerInterceptorBenchmark | PubSubWithConsumerInterceptor | 1000000 | False | 729.8 ms | 14.52 ms | 12.12 ms | 144000.0000 | 3000.0000 | 3000.0000 | 1.26 GB | +| PubSubWithProducerInterceptorBenchmark | PubSubWithProducerInterceptor | 1000000 | False | 759.4 ms | 12.06 ms | 11.28 ms | 154000.0000 | 3000.0000 | 3000.0000 | 1.35 GB | +| PubSubWithPublishInterceptorBenchmark | PubSubWithPublishInterceptor | 1000000 | False | 752.2 ms | 10.63 ms | 9.94 ms | 154000.0000 | 3000.0000 | 3000.0000 | 1.35 GB | +| PubSubBenchmark | PubSub | 1000000 | True | 673.1 ms | 6.32 ms | 5.91 ms | 130000.0000 | 3000.0000 | 3000.0000 | 1.14 GB | +| PubSubWithConsumerInterceptorBenchmark | PubSubWithConsumerInterceptor | 1000000 | True | 769.8 ms | 10.16 ms | 9.01 ms | 157000.0000 | 3000.0000 | 3000.0000 | 1.38 GB | +| PubSubWithProducerInterceptorBenchmark | PubSubWithProducerInterceptor | 1000000 | True | 789.1 ms | 14.11 ms | 12.51 ms | 167000.0000 | 3000.0000 | 3000.0000 | 1.47 GB | +| PubSubWithPublishInterceptorBenchmark | PubSubWithPublishInterceptor | 1000000 | True | 802.6 ms | 9.42 ms | 7.87 ms | 167000.0000 | 3000.0000 | 3000.0000 | 1.47 GB | + +``` +// * Hints * +Outliers + PubSubBenchmark.PubSub: MaxIterationCount=30, MaxWarmupIterationCount=10 + -> 2 outliers were removed (689.29 ms, 692.70 ms) + PubSubWithConsumerInterceptorBenchmark.PubSubWithConsumerInterceptor: MaxIterationCount=30, MaxWarmupIterationCount=10 -> 2 outliers were removed (771.75 ms, 783.11 ms) + PubSubBenchmark.PubSub: MaxIterationCount=30, MaxWarmupIterationCount=10 + -> 1 outlier was detected (662.74 ms) + PubSubWithConsumerInterceptorBenchmark.PubSubWithConsumerInterceptor: MaxIterationCount=30, MaxWarmupIterationCount=10 -> 1 outlier was removed (797.48 ms) + PubSubWithProducerInterceptorBenchmark.PubSubWithProducerInterceptor: MaxIterationCount=30, MaxWarmupIterationCount=10 -> 1 outlier was removed (842.72 ms) + PubSubWithPublishInterceptorBenchmark.PubSubWithPublishInterceptor: MaxIterationCount=30, MaxWarmupIterationCount=10 -> 2 outliers were removed, 3 outliers were detected (786.09 ms, 821.03 ms, 827.84 ms) + +// * Legends * + messageCount : Value of the 'messageCount' parameter + createMessageScope : Value of the 'createMessageScope' parameter + Mean : Arithmetic mean of all measurements + Error : Half of 99.9% confidence interval + StdDev : Standard deviation of all measurements + Gen0 : GC Generation 0 collects per 1000 operations + Gen1 : GC Generation 1 collects per 1000 operations + Gen2 : GC Generation 2 collects per 1000 operations + Allocated : Allocated memory per single operation (managed only, inclusive, 1KB = 1024B) + 1 ms : 1 Millisecond (0.001 sec) + +// * Diagnostic Output - MemoryDiagnoser * + + +// ***** BenchmarkRunner: End ***** +Global total time: 00:03:23 (203.54 sec), executed benchmarks: 8 +// * Artifacts cleanup * +Artifacts cleanup is finished +``` diff --git a/src/Tests/SlimMessageBus.Host.Memory.Benchmark/ReqRespBenchmark.cs b/src/Tests/SlimMessageBus.Host.Memory.Benchmark/ReqRespBenchmark.cs index c3dfeb82..2c379f03 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Benchmark/ReqRespBenchmark.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Benchmark/ReqRespBenchmark.cs @@ -8,25 +8,21 @@ public abstract class ReqRespBaseBenchmark : AbstractMemoryBenchmark { - private readonly TestResult testResult; - - protected ReqRespBaseBenchmark() - { - testResult = svp.GetRequiredService(); - } - protected override void Setup(ServiceCollection services) { services.AddSingleton(); services.AddTransient(); } - public async Task RunTest(int messageCount) + public async Task RunTest(int messageCount, bool createMessageScope) { + PerMessageScopeEnabled = createMessageScope; + var bus = Bus; var sendRequests = Enumerable.Range(0, messageCount).Select(x => bus.Send(new SomeRequest(DateTimeOffset.Now, x))); await Task.WhenAll(sendRequests); + var testResult = ServiceProvider.GetRequiredService(); while (testResult.ArrivedCount < messageCount) { await Task.Yield(); @@ -38,8 +34,9 @@ public async Task RunTest(int messageCount) public class ReqRespBenchmark : ReqRespBaseBenchmark { [Benchmark] - [Arguments(1000000)] - public Task RequestResponse(int messageCount) => RunTest(messageCount); + [Arguments(1000000, true)] + [Arguments(1000000, false)] + public Task RequestResponse(int messageCount, bool createMessageScope) => RunTest(messageCount, createMessageScope); } [MemoryDiagnoser] @@ -53,8 +50,9 @@ protected override void Setup(ServiceCollection services) } [Benchmark] - [Arguments(1000000)] - public Task ReqRespWithProducerInterceptor(int messageCount) => RunTest(messageCount); + [Arguments(1000000, true)] + [Arguments(1000000, false)] + public Task ReqRespWithProducerInterceptor(int messageCount, bool createMessageScope) => RunTest(messageCount, createMessageScope); } [MemoryDiagnoser] @@ -68,8 +66,9 @@ protected override void Setup(ServiceCollection services) } [Benchmark] - [Arguments(1000000)] - public Task ReqRespWithSendInterceptor(int messageCount) => RunTest(messageCount); + [Arguments(1000000, true)] + [Arguments(1000000, false)] + public Task ReqRespWithSendInterceptor(int messageCount, bool createMessageScope) => RunTest(messageCount, createMessageScope); } [MemoryDiagnoser] @@ -83,8 +82,9 @@ protected override void Setup(ServiceCollection services) } [Benchmark] - [Arguments(1000000)] - public Task ReqRespWithConsumerInterceptor(int messageCount) => RunTest(messageCount); + [Arguments(1000000, true)] + [Arguments(1000000, false)] + public Task ReqRespWithConsumerInterceptor(int messageCount, bool createMessageScope) => RunTest(messageCount, createMessageScope); } [MemoryDiagnoser] @@ -98,8 +98,9 @@ protected override void Setup(ServiceCollection services) } [Benchmark] - [Arguments(1000000)] - public Task ReqRespWithRequestHandlerInterceptor(int messageCount) => RunTest(messageCount); + [Arguments(1000000, true)] + [Arguments(1000000, false)] + public Task ReqRespWithRequestHandlerInterceptor(int messageCount, bool createMessageScope) => RunTest(messageCount, createMessageScope); } public record SomeRequest(DateTimeOffset Timestamp, long Id) : IRequest; diff --git a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs index 6f0316d9..00a61811 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs @@ -507,6 +507,38 @@ public async Task When_Send_Given_AHandlerThatThrowsException_Then_ExceptionIsBu await act.Should().ThrowAsync(); } } + + [Fact] + public async Task When_Publish_Given_NoConsumerRegistered_Then_NoOp() + { + const string topic = "topic-a"; + + _builder.Produce(x => x.DefaultTopic(topic)); + + var request = new SomeRequest(Guid.NewGuid()); + + // act + Func act = () => _subject.Value.ProducePublish(request); + + // assert + await act.Should().NotThrowAsync(); + } + + [Fact] + public async Task When_Send_Given_NoHandlerRegistered_Then_ResponseIsNull() + { + const string topic = "topic-a"; + + _builder.Produce(x => x.DefaultTopic(topic)); + + var request = new SomeRequest(Guid.NewGuid()); + + // act + var response = await _subject.Value.ProduceSend(request); + + // assert + response.Should().BeNull(); + } } public record SomeMessageA(Guid Value); diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs index a05b74cf..280ed2f5 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs @@ -1,5 +1,4 @@ -namespace SlimMessageBus.Host.Test.Consumer; - +namespace SlimMessageBus.Host.Test.Consumer; public class AbstractConsumerTests { private class TestConsumer(ILogger logger, IEnumerable settings, IEnumerable interceptors) @@ -35,12 +34,21 @@ public AbstractConsumerTests() } [Theory] - [InlineData(true)] - [InlineData(false)] - public async Task When_Start_Then_Interceptor_CanStartIsCalled(bool canStart) + [InlineData(true, false)] + [InlineData(false, false)] + [InlineData(false, true)] + [InlineData(true, true)] + public async Task When_Start_Then_Interceptor_CanStartIsCalled(bool canStart, bool interceptorThrowsException) { - // Arrange - _interceptor.Setup(x => x.CanStart(_target)).ReturnsAsync(canStart); + // Arrange + if (interceptorThrowsException) + { + _interceptor.Setup(x => x.CanStart(_target)).ThrowsAsync(new Exception()); + } + else + { + _interceptor.Setup(x => x.CanStart(_target)).ReturnsAsync(canStart); + } // Act await _target.Start(); @@ -48,15 +56,38 @@ public async Task When_Start_Then_Interceptor_CanStartIsCalled(bool canStart) // Assert _target.IsStarted.Should().BeTrue(); - _interceptor.Verify(x => x.CanStart(_target), Times.Once); - _interceptor.Verify(x => x.Started(_target), canStart ? Times.Once : Times.Never); _interceptor.VerifyGet(x => x.Order, Times.Once); + _interceptor.Verify(x => x.CanStart(_target), Times.Once); + _interceptor.Verify(x => x.Started(_target), canStart || interceptorThrowsException ? Times.Once : Times.Never); _interceptor.VerifyNoOtherCalls(); - _targetMock.Verify(x => x.OnStart(), canStart ? Times.Once : Times.Never); + _targetMock.Verify(x => x.OnStart(), canStart || interceptorThrowsException ? Times.Once : Times.Never); _targetMock.VerifyNoOtherCalls(); } + [Fact] + public async Task When_Start_Givn_CalledConcurrently_Then_ItWillStartOnce() + { + // Arrange + _interceptor.Setup(x => x.CanStart(_target)).ReturnsAsync(true); + + var startTasks = Enumerable.Range(0, 100).Select(_ => _target.Start()).ToArray(); + + // Act + await Task.WhenAll(startTasks); + + // Assert + _target.IsStarted.Should().BeTrue(); + + _interceptor.VerifyGet(x => x.Order, Times.Once); + _interceptor.Verify(x => x.CanStart(_target), Times.Once); + _interceptor.Verify(x => x.Started(_target), Times.Once); + _interceptor.VerifyNoOtherCalls(); + + _targetMock.Verify(x => x.OnStart(), Times.Once); + _targetMock.VerifyNoOtherCalls(); + } + [Theory] [InlineData(true)] [InlineData(false)] @@ -74,11 +105,11 @@ public async Task When_Stop_Then_Interceptor_CanStopIsCalled(bool canStop) // Assert _target.IsStarted.Should().BeFalse(); + _interceptor.VerifyGet(x => x.Order, Times.Once); _interceptor.Verify(x => x.CanStart(_target), Times.Once); _interceptor.Verify(x => x.CanStop(_target), Times.Once); _interceptor.Verify(x => x.Started(_target), Times.Once); _interceptor.Verify(x => x.Stopped(_target), canStop ? Times.Once : Times.Never); - _interceptor.VerifyGet(x => x.Order, Times.Once); _interceptor.VerifyNoOtherCalls(); _targetMock.Verify(x => x.OnStart(), Times.Once); diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/ResponseMessageProcessorTest.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/ResponseMessageProcessorTest.cs new file mode 100644 index 00000000..d82ff5e6 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Test/Consumer/ResponseMessageProcessorTest.cs @@ -0,0 +1,151 @@ +namespace SlimMessageBus.Host.Test.Consumer; + +public class ResponseMessageProcessorTest +{ + private readonly RequestResponseSettings _settings; + private readonly Mock> _messageProviderMock; + private readonly Mock _pendingRequestStoreMock; + private readonly ResponseMessageProcessor _subject; + private readonly object _transportMessage; + private readonly Dictionary _messageHeaders; + + public ResponseMessageProcessorTest() + { + _settings = new RequestResponseSettings(); + _messageProviderMock = new Mock>(); + _pendingRequestStoreMock = new Mock(); + _subject = new ResponseMessageProcessor(NullLoggerFactory.Instance, + _settings, + _messageProviderMock.Object, + _pendingRequestStoreMock.Object, + new CurrentTimeProvider()); + _transportMessage = new object(); + _messageHeaders = []; + } + + [Fact] + public async Task When_ProcessMessage_Given_NoRequestIdHeader_Then_ExceptionResult() + { + // arrange + + // act + var r = await _subject.ProcessMessage(_transportMessage, _messageHeaders, null, null); + + // assert + r.Exception.Should().BeOfType(); + r.Response.Should().BeNull(); + r.Result.Should().Be(ProcessResult.Failure); + } + + [Fact] + public async Task When_ProcessMessage_Given_NonExistendRequestId_Then_ResponseIsNullAndNoError() + { + // arrange + _messageHeaders[ReqRespMessageHeaders.RequestId] = "requestId"; + + // act + var r = await _subject.ProcessMessage(_transportMessage, _messageHeaders, null, null); + + // assert + r.Exception.Should().BeNull(); + r.Response.Should().BeNull(); + r.Result.Should().Be(ProcessResult.Success); + } + + [Fact] + public async Task When_ProcessMessage_Given_ResponseIsFaulted_Then_ExceptionResult() + { + // arrange + var requestId = "requestId"; + var responseError = "the error"; + _messageHeaders[ReqRespMessageHeaders.RequestId] = requestId; + _messageHeaders[ReqRespMessageHeaders.Error] = responseError; + + var pendingRequestState = new PendingRequestState(requestId, + new object(), + typeof(object), + typeof(object), + DateTimeOffset.UtcNow, + DateTimeOffset.UtcNow.AddHours(2), + default); + + _pendingRequestStoreMock.Setup(x => x.GetById(requestId)).Returns(pendingRequestState); + + // act + var r = await _subject.ProcessMessage(_transportMessage, _messageHeaders, null, null); + + // assert + r.Exception.Should().BeNull(); + r.Response.Should().BeNull(); + r.Result.Should().Be(ProcessResult.Success); + + Func act = () => pendingRequestState.TaskCompletionSource.Task; + await act.Should().ThrowAsync().WithMessage(responseError); + } + + [Fact] + public async Task When_ProcessMessage_Given_ResponseArrivedOnTime_Then_TaskSourceIsResolved() + { + // arrange + var requestId = "requestId"; + var response = new object(); + + _messageProviderMock.Setup(x => x(response.GetType(), _transportMessage)).Returns(response); + + _messageHeaders[ReqRespMessageHeaders.RequestId] = requestId; + + var pendingRequestState = new PendingRequestState(requestId, + new object(), + typeof(object), + typeof(object), + DateTimeOffset.UtcNow, + DateTimeOffset.UtcNow.AddHours(2), + default); + + _pendingRequestStoreMock.Setup(x => x.GetById(requestId)).Returns(pendingRequestState); + + // act + var r = await _subject.ProcessMessage(_transportMessage, _messageHeaders, null, null); + + // assert + r.Exception.Should().BeNull(); + r.Response.Should().BeNull(); + r.Result.Should().Be(ProcessResult.Success); + + var responseReturned = await pendingRequestState.TaskCompletionSource.Task; + responseReturned.Should().BeSameAs(response); + } + + [Fact] + public async Task When_ProcessMessage_Given_ResponseCannotBeDeserialized_Then_TaskSourceIsException() + { + // arrange + var requestId = "requestId"; + var ex = new Exception("Boom!"); + + _messageProviderMock.Setup(x => x(typeof(object), _transportMessage)).Throws(ex); + + _messageHeaders[ReqRespMessageHeaders.RequestId] = requestId; + + var pendingRequestState = new PendingRequestState(requestId, + new object(), + typeof(object), + typeof(object), + DateTimeOffset.UtcNow, + DateTimeOffset.UtcNow.AddHours(2), + default); + + _pendingRequestStoreMock.Setup(x => x.GetById(requestId)).Returns(pendingRequestState); + + // act + var r = await _subject.ProcessMessage(_transportMessage, _messageHeaders, null, null); + + // assert + r.Exception.Should().BeNull(); + r.Response.Should().BeNull(); + r.Result.Should().Be(ProcessResult.Success); + + Func act = () => pendingRequestState.TaskCompletionSource.Task; + await act.Should().ThrowAsync(); + } +} diff --git a/src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs b/src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs index 6a3350aa..482f0243 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Hybrid/HybridMessageBusTest.cs @@ -44,6 +44,7 @@ public HybridMessageBusTest() _serviceProviderMock.Setup(x => x.GetService(typeof(RuntimeTypeCache))).Returns(new RuntimeTypeCache()); _serviceProviderMock.Setup(x => x.GetService(typeof(IPendingRequestManager))).Returns(() => new PendingRequestManager(new InMemoryPendingRequestStore(), new CurrentTimeProvider(), NullLoggerFactory.Instance)); + _loggerMock.Setup(x => x.IsEnabled(It.IsAny())).Returns(true); _loggerFactoryMock.Setup(x => x.CreateLogger(It.IsAny())).Returns(_loggerMock.Object); _messageBusBuilder.AddChildBus("bus1", (mbb) => @@ -179,7 +180,7 @@ public async Task Given_UndeclareMessageType_When_Publish_Then_FollowsSettingsMo _loggerMock.Verify(x => x.Log( LogLevel.Information, It.IsAny(), - It.Is((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type: "))), + It.Is((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type "))), It.IsAny(), It.IsAny>()), mode == UndeclaredMessageTypeMode.RaiseOneTimeLog ? Times.Once : Times.Never); } @@ -218,7 +219,7 @@ public async Task Given_UndeclaredRequestType_When_Send_Then_FollowsSettingsMode _loggerMock.Verify(x => x.Log( LogLevel.Information, It.IsAny(), - It.Is((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type: "))), + It.Is((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type "))), It.IsAny(), It.IsAny>()), mode == UndeclaredMessageTypeMode.RaiseOneTimeLog ? Times.Once : Times.Never); } @@ -256,7 +257,7 @@ public async Task Given_UndeclaredRequestTypeWithoutResponse_When_Send_Then_Foll _loggerMock.Verify(x => x.Log( LogLevel.Information, It.IsAny(), - It.Is((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type: "))), + It.Is((x, _) => MoqMatchers.LogMessageMatcher(x, m => m.StartsWith("Could not find any bus that produces the message type "))), It.IsAny(), It.IsAny>()), mode == UndeclaredMessageTypeMode.RaiseOneTimeLog ? Times.Once : Times.Never); } diff --git a/src/Tests/SlimMessageBus.Host.Test/ReqestResponse/PendingRequestStateTest.cs b/src/Tests/SlimMessageBus.Host.Test/ReqestResponse/PendingRequestStateTest.cs new file mode 100644 index 00000000..2e4b2b17 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Test/ReqestResponse/PendingRequestStateTest.cs @@ -0,0 +1,21 @@ +namespace SlimMessageBus.Host.Test; + +public class PendingRequestStateTest +{ + [Fact] + public void When_ToString_Then_ReturnsExpectedValue() + { + // arrange + var request = new object(); + var requestId = "r1"; + var requestType = typeof(object); + var responseType = typeof(object); + var state = new PendingRequestState(requestId, request, requestType, responseType, DateTimeOffset.Now, DateTimeOffset.Now.AddSeconds(30), CancellationToken.None); + + // act + var result = state.ToString(); + + // assert + result.Should().StartWith($"Request(Id: {requestId}, RequestType: {requestType}, ResponseType: {responseType}"); + } +}