diff --git a/src/.editorconfig b/src/.editorconfig index 693b8598..a0368644 100644 --- a/src/.editorconfig +++ b/src/.editorconfig @@ -182,6 +182,7 @@ dotnet_style_qualification_for_field = false:suggestion dotnet_style_qualification_for_property = false:suggestion dotnet_style_qualification_for_method = false:suggestion dotnet_style_qualification_for_event = false:suggestion +dotnet_diagnostic.VSTHRD200.severity = none [*.{csproj,xml}] indent_style = space diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs index 5d36c4de..9dc3efcf 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs @@ -157,9 +157,10 @@ protected override async Task OnStop() if (_consumerTask == null) { throw new MessageBusException($"Consumer for group {Group} not yet started"); - } - - _consumerCts.Cancel(); + } + + await _consumerCts.CancelAsync(); + try { await _consumerTask.ConfigureAwait(false); diff --git a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs index 984f2864..b79a7541 100644 --- a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs +++ b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs @@ -94,7 +94,7 @@ protected override void Build() .ToDictionary( x => x.Key, // Note: The consumers will first have IConsumer<>, then IRequestHandler<> - x => CreateMessageProcessor(x.OrderBy(consumerSettings => ConsumerModeOrder(consumerSettings)).ToList(), x.Key)); + x => CreateMessageProcessor([.. x.OrderBy(consumerSettings => ConsumerModeOrder(consumerSettings))], x.Key)); _messageProcessorQueueByPath = ProviderSettings.EnableBlockingPublish ? [] diff --git a/src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs b/src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs index 2b090254..bebc502f 100644 --- a/src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs +++ b/src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs @@ -10,23 +10,7 @@ protected override async Task OnStart() { _subscription ??= await connection.SubscribeCoreAsync(subject, cancellationToken: CancellationToken); - _messageConsumerTask = Task.Factory.StartNew(async () => - { - try - { - while (await _subscription.Msgs.WaitToReadAsync(CancellationToken)) - { - while (_subscription.Msgs.TryRead(out var msg)) - { - await messageProcessor.ProcessMessage(msg, msg.Headers.ToReadOnlyDictionary(), cancellationToken: CancellationToken).ConfigureAwait(false); - } - } - } - catch (OperationCanceledException ex) - { - Logger.LogInformation(ex, "Consumer task was cancelled"); - } - }, CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap(); + _messageConsumerTask = Task.Factory.StartNew(OnLoop, CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap(); } protected override async Task OnStop() @@ -41,5 +25,23 @@ protected override async Task OnStop() await _subscription.UnsubscribeAsync().ConfigureAwait(false); await _subscription.DisposeAsync(); } + } + + private async Task OnLoop() + { + try + { + while (await _subscription!.Msgs.WaitToReadAsync(CancellationToken)) + { + while (_subscription.Msgs.TryRead(out var msg)) + { + await messageProcessor.ProcessMessage(msg, msg.Headers.ToReadOnlyDictionary(), cancellationToken: CancellationToken).ConfigureAwait(false); + } + } + } + catch (OperationCanceledException ex) + { + Logger.LogInformation(ex, "Consumer task was cancelled"); + } } } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs b/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs index c0b32c62..843a3f0f 100644 --- a/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs +++ b/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs @@ -23,15 +23,15 @@ internal class OutboxSendingTask( private Task _startBusTask; private Task _stopBusTask; - private int _busStartCount; - + private int _busStartCount; + private DateTime? _cleanupNextRun; private bool ShouldRunCleanup() { if (_outboxSettings.MessageCleanup?.Enabled == true) { - var trigger = _cleanupNextRun is null || DateTime.UtcNow > _cleanupNextRun.Value; + var trigger = !_cleanupNextRun.HasValue || DateTime.UtcNow > _cleanupNextRun.Value; if (trigger) { _cleanupNextRun = DateTime.UtcNow.Add(_outboxSettings.MessageCleanup.Interval); @@ -78,7 +78,10 @@ protected async Task Stop() { _logger.LogDebug("Outbox loop stopping..."); - _loopCts?.Cancel(); + if (_loopCts != null) + { + await _loopCts.CancelAsync(); + } if (_loopTask != null) { @@ -261,8 +264,7 @@ async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxR { var busName = busGroup.Key; var bus = GetBus(compositeMessageBus, messageBusTarget, busName); - var bulkProducer = bus as IMessageBusBulkProducer; - if (bus == null || bulkProducer == null) + if (bus == null || bus is not IMessageBusBulkProducer bulkProducer) { foreach (var outboxMessage in busGroup) { diff --git a/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs b/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs index dab3edb3..7ea8acab 100644 --- a/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs +++ b/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs @@ -12,10 +12,7 @@ public abstract class AbstractConsumer : IAsyncDisposable, IConsumerControl protected CancellationToken CancellationToken => _cancellationTokenSource.Token; - protected AbstractConsumer(ILogger logger) - { - Logger = logger; - } + protected AbstractConsumer(ILogger logger) => Logger = logger; public async Task Start() { @@ -29,7 +26,7 @@ public async Task Start() { if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) { - _cancellationTokenSource?.Cancel(); + _cancellationTokenSource?.Dispose(); _cancellationTokenSource = new CancellationTokenSource(); } @@ -53,7 +50,7 @@ public async Task Stop() _stopping = true; try { - _cancellationTokenSource.Cancel(); + await _cancellationTokenSource.CancelAsync(); await OnStop().ConfigureAwait(false); diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs index d0782433..b4b4cf00 100644 --- a/src/SlimMessageBus.Host/MessageBusBase.cs +++ b/src/SlimMessageBus.Host/MessageBusBase.cs @@ -353,8 +353,8 @@ protected async virtual ValueTask DisposeAsyncCore() await Stop().ConfigureAwait(false); if (_cancellationTokenSource != null) - { - _cancellationTokenSource.Cancel(); + { + await _cancellationTokenSource.CancelAsync(); _cancellationTokenSource.Dispose(); _cancellationTokenSource = null; } diff --git a/src/SlimMessageBus.Host/PlatformExtensions.cs b/src/SlimMessageBus.Host/PlatformExtensions.cs new file mode 100644 index 00000000..849cf970 --- /dev/null +++ b/src/SlimMessageBus.Host/PlatformExtensions.cs @@ -0,0 +1,18 @@ +namespace SlimMessageBus.Host; + +using System.Diagnostics.CodeAnalysis; + +/// +/// A set of platform extensions to backfill functionality for some of the missing API prior in .NET 8.0. +/// +public static class PlatformExtensions +{ +#if !NET8_0_OR_GREATER + [ExcludeFromCodeCoverage] + public static Task CancelAsync(this CancellationTokenSource cts) + { + cts.Cancel(); + return Task.CompletedTask; + } +#endif +} \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs index aa2a71e2..e4f7a680 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs @@ -93,7 +93,6 @@ public class ProcessMessagesTests private readonly Mock _mockMessageBusTarget; private readonly Mock _mockMasterMessageBus; private readonly Mock _mockMessageBusBulkProducer; - private readonly Mock> _mockLogger; private readonly OutboxSettings _outboxSettings; private readonly OutboxSendingTask _sut; @@ -104,7 +103,6 @@ public ProcessMessagesTests() _mockMessageBusTarget = new Mock(); _mockMasterMessageBus = new Mock(); _mockMessageBusBulkProducer = _mockMasterMessageBus.As(); - _mockLogger = new Mock>(); _outboxSettings = new OutboxSettings { @@ -178,8 +176,8 @@ public async Task ProcessMessages_ShouldAbortDelivery_WhenBusIsNotRecognised() outboxMessages[0].BusName = null; outboxMessages[7].BusName = null; - var knownBusCount = outboxMessages.Count(x => x.BusName != null); - + var knownBusCount = outboxMessages.Count(x => x.BusName != null); + _mockMessageBusTarget.SetupGet(x => x.Target).Returns((IMessageBusProducer)null); _mockCompositeMessageBus.Setup(x => x.GetChildBus(It.IsAny())).Returns(_mockMasterMessageBus.Object); diff --git a/src/Tests/SlimMessageBus.Host.Test/Helpers/ReflectionUtilsTests.cs b/src/Tests/SlimMessageBus.Host.Test/Helpers/ReflectionUtilsTests.cs index 93677b61..9c29d055 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Helpers/ReflectionUtilsTests.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Helpers/ReflectionUtilsTests.cs @@ -7,7 +7,9 @@ public void When_GenerateGetterFunc_Given_TaskOfT_Then_ResultOfTaskIsObtained() { // arrange var taskWithResult = Task.FromResult(1); +#pragma warning disable xUnit1031 // Do not use blocking task operations in test method var resultPropertyInfo = typeof(Task).GetProperty(nameof(Task.Result)); +#pragma warning restore xUnit1031 // Do not use blocking task operations in test method // act var getResultLambda = ReflectionUtils.GenerateGetterFunc(resultPropertyInfo); @@ -25,7 +27,7 @@ public async Task When_GenerateMethodCallToFunc_Given_ConsumerWithOnHandlerAsync var message = new SomeMessage(); var instanceType = typeof(IConsumer); - var consumerOnHandleMethodInfo = instanceType.GetMethod(nameof(IConsumer.OnHandle), new[] { typeof(SomeMessage) }); + var consumerOnHandleMethodInfo = instanceType.GetMethod(nameof(IConsumer.OnHandle), [typeof(SomeMessage)]); var consumerMock = new Mock>(); consumerMock.Setup(x => x.OnHandle(message)).Returns(Task.CompletedTask); @@ -53,7 +55,7 @@ public void When_GenerateGenericMethodCallToFunc_Given_GenericMethid_Then_Method var genericMethod = typeof(ClassWithGenericMethod).GetMethods().FirstOrDefault(x => x.Name == nameof(ClassWithGenericMethod.GenericMethod)); // act - var methodOfTypeBoolFunc = ReflectionUtils.GenerateGenericMethodCallToFunc>(genericMethod, new[] { typeof(bool) }, obj.GetType(), typeof(object)); + var methodOfTypeBoolFunc = ReflectionUtils.GenerateGenericMethodCallToFunc>(genericMethod, [typeof(bool)], obj.GetType(), typeof(object)); var result = methodOfTypeBoolFunc(obj); // assert @@ -75,7 +77,9 @@ public async Task When_TaskOfObjectContinueWithTaskOfTypeFunc_Given_TaskOfObject typedTask.GetType().Should().BeAssignableTo(typeof(Task<>).MakeGenericType(typeof(int))); +#pragma warning disable xUnit1031 // Do not use blocking task operations in test method var resultFunc = ReflectionUtils.GenerateGetterFunc(typeof(Task).GetProperty(nameof(Task.Result))); +#pragma warning restore xUnit1031 // Do not use blocking task operations in test method var result = resultFunc(typedTask); result.Should().Be(10);