diff --git a/README.md b/README.md
index a5d018f5..fcbc9a23 100644
--- a/README.md
+++ b/README.md
@@ -93,6 +93,7 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i
| `.Host.Outbox.Sql` | Transactional Outbox using MSSQL | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.Sql.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql) |
| `.Host.Outbox.Sql.DbContext` | Transactional Outbox using MSSQL with EF DataContext integration | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.Sql.DbContext.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql.DbContext) |
| `.Host.AsyncApi` | [AsyncAPI](https://www.asyncapi.com/) specification generation via [Saunter](https://github.com/tehmantra/saunter) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AsyncApi.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AsyncApi) |
+| `.Host.CircuitBreaker.HealthCheck` | Consumer circuit breaker based on [health checks](docs/intro.md#health-check-circuit-breaker) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.CircuitBreaker.HealthCheck.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.CircuitBreaker.HealthCheck) |
Typically the application layers (domain model, business logic) only need to depend on `SlimMessageBus` which is the facade, and ultimately the application hosting layer (ASP.NET, Console App, Windows Service) will reference and configure the other packages (`SlimMessageBus.Host.*`) which are the messaging transport providers and additional plugins.
diff --git a/build/tasks.ps1 b/build/tasks.ps1
index 649a062d..1af71ba1 100644
--- a/build/tasks.ps1
+++ b/build/tasks.ps1
@@ -43,6 +43,7 @@ $projects = @(
"SlimMessageBus.Host.Outbox.Sql",
"SlimMessageBus.Host.Outbox.Sql.DbContext",
+ "SlimMessageBus.Host.CircuitBreaker",
"SlimMessageBus.Host.CircuitBreaker.HealthCheck",
"SlimMessageBus.Host.AsyncApi"
diff --git a/docs/NuGet.md b/docs/NuGet.md
index 23b474ef..ce59ffb3 100644
--- a/docs/NuGet.md
+++ b/docs/NuGet.md
@@ -24,5 +24,6 @@ Plugins:
- Transactional Outbox pattern (SQL, DbContext)
- Serialization using JSON, Avro, ProtoBuf
- AsyncAPI specification generation
+- Consumer Circuit Breaker based on Health Checks
Find out more [https://github.com/zarusz/SlimMessageBus](https://github.com/zarusz/SlimMessageBus).
diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml
index ea2f607f..a1eda2f8 100644
--- a/src/Host.Plugin.Properties.xml
+++ b/src/Host.Plugin.Properties.xml
@@ -4,7 +4,7 @@
- 3.0.0-rc901
+ 3.0.0-rc902
\ No newline at end of file
diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/AddConsumer.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/AddConsumer.cs
index 1da885ad..fdfca42f 100644
--- a/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/AddConsumer.cs
+++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/AddConsumer.cs
@@ -1,17 +1,10 @@
namespace Sample.CircuitBreaker.HealthCheck.Consumers;
-public class AddConsumer : IConsumer
-{
- private readonly ILogger _logger;
-
- public AddConsumer(ILogger logger)
- {
- _logger = logger;
- }
-
+public class AddConsumer(ILogger logger) : IConsumer
+{
public Task OnHandle(Add message, CancellationToken cancellationToken)
{
- _logger.LogInformation("{A} + {B} = {C}", message.a, message.b, message.a + message.b);
+ logger.LogInformation("{A} + {B} = {C}", message.A, message.B, message.A + message.B);
return Task.CompletedTask;
}
}
diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/SubtractConsumer.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/SubtractConsumer.cs
index 467a30d5..0d296333 100644
--- a/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/SubtractConsumer.cs
+++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Consumers/SubtractConsumer.cs
@@ -1,17 +1,10 @@
namespace Sample.CircuitBreaker.HealthCheck.Consumers;
-public class SubtractConsumer : IConsumer
-{
- private readonly ILogger _logger;
-
- public SubtractConsumer(ILogger logger)
- {
- _logger = logger;
- }
-
+public class SubtractConsumer(ILogger logger) : IConsumer
+{
public Task OnHandle(Subtract message, CancellationToken cancellationToken)
{
- _logger.LogInformation("{A} - {B} = {C}", message.a, message.b, message.a - message.b);
+ logger.LogInformation("{A} - {B} = {C}", message.A, message.B, message.A - message.B);
return Task.CompletedTask;
}
}
diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs
index 61fd0fa5..ac87444b 100644
--- a/src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs
+++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs
@@ -14,4 +14,4 @@
global using SlimMessageBus;
global using SlimMessageBus.Host;
global using SlimMessageBus.Host.RabbitMQ;
-global using SlimMessageBus.Host.Serialization.SystemTextJson;
\ No newline at end of file
+global using SlimMessageBus.Host.Serialization.SystemTextJson;
diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/AddRandomHealthCheck.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/AddRandomHealthCheck.cs
index b74784dd..93bc7607 100644
--- a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/AddRandomHealthCheck.cs
+++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/AddRandomHealthCheck.cs
@@ -1,11 +1,5 @@
namespace Sample.CircuitBreaker.HealthCheck.HealthChecks;
-using Microsoft.Extensions.Logging;
-
-public class AddRandomHealthCheck : RandomHealthCheck
+public class AddRandomHealthCheck(ILogger logger) : RandomHealthCheck(logger)
{
- public AddRandomHealthCheck(ILogger logger)
- : base(logger)
- {
- }
}
diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/RandomHealthCheck.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/RandomHealthCheck.cs
index cf9ccf88..a448c3ec 100644
--- a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/RandomHealthCheck.cs
+++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/RandomHealthCheck.cs
@@ -2,19 +2,12 @@
using Microsoft.Extensions.Diagnostics.HealthChecks;
-public abstract class RandomHealthCheck : IHealthCheck
-{
- private readonly ILogger _logger;
-
- protected RandomHealthCheck(ILogger logger)
- {
- _logger = logger;
- }
-
+public abstract class RandomHealthCheck(ILogger logger) : IHealthCheck
+{
public Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
var value = (HealthStatus)Random.Shared.Next(3);
- _logger.LogInformation("{HealthCheck} evaluated as {HealthStatus}", this.GetType(), value);
+ logger.LogInformation("{HealthCheck} evaluated as {HealthStatus}", GetType(), value);
return Task.FromResult(new HealthCheckResult(value, value.ToString()));
}
}
diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/SubtractRandomHealthCheck.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/SubtractRandomHealthCheck.cs
index 8a68b0b1..27ce53e5 100644
--- a/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/SubtractRandomHealthCheck.cs
+++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/HealthChecks/SubtractRandomHealthCheck.cs
@@ -1,11 +1,5 @@
namespace Sample.CircuitBreaker.HealthCheck.HealthChecks;
-using Microsoft.Extensions.Logging;
-
-public class SubtractRandomHealthCheck : RandomHealthCheck
+public class SubtractRandomHealthCheck(ILogger logger) : RandomHealthCheck(logger)
{
- public SubtractRandomHealthCheck(ILogger logger)
- : base(logger)
- {
- }
}
diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/IntermittentMessagePublisher.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/IntermittentMessagePublisher.cs
index 73110c15..7d887959 100644
--- a/src/Samples/Sample.CircuitBreaker.HealthCheck/IntermittentMessagePublisher.cs
+++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/IntermittentMessagePublisher.cs
@@ -1,15 +1,8 @@
namespace Sample.CircuitBreaker.HealthCheck;
-public class IntermittentMessagePublisher : BackgroundService
+
+public class IntermittentMessagePublisher(ILogger logger, IMessageBus messageBus)
+ : BackgroundService
{
- private readonly ILogger _logger;
- private readonly IMessageBus _messageBus;
-
- public IntermittentMessagePublisher(ILogger logger, IMessageBus messageBus)
- {
- _logger = logger;
- _messageBus = messageBus;
- }
-
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
@@ -17,11 +10,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var a = Random.Shared.Next(10);
var b = Random.Shared.Next(10);
- //_logger.LogInformation("Emitting {A} +- {B} = ?", a, b);
+ logger.LogInformation("Emitting {A} +- {B} = ?", a, b);
await Task.WhenAll(
- _messageBus.Publish(new Add(a, b)),
- _messageBus.Publish(new Subtract(a, b)),
+ messageBus.Publish(new Add(a, b), cancellationToken: stoppingToken),
+ messageBus.Publish(new Subtract(a, b), cancellationToken: stoppingToken),
Task.Delay(1000, stoppingToken));
}
}
diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs
index 97c5e418..2208622f 100644
--- a/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs
+++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs
@@ -1,3 +1,3 @@
namespace Sample.CircuitBreaker.HealthCheck.Models;
-public record Add(int a, int b);
\ No newline at end of file
+public record Add(int A, int B);
\ No newline at end of file
diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Subtract.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Subtract.cs
index 51d2efc4..d491f605 100644
--- a/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Subtract.cs
+++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Subtract.cs
@@ -1,3 +1,3 @@
namespace Sample.CircuitBreaker.HealthCheck.Models;
-public record Subtract(int a, int b);
\ No newline at end of file
+public record Subtract(int A, int B);
\ No newline at end of file
diff --git a/src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs b/src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs
index 716253f5..718af413 100644
--- a/src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs
+++ b/src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs
@@ -1,9 +1,10 @@
namespace Sample.CircuitBreaker.HealthCheck;
+
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Sample.CircuitBreaker.HealthCheck.HealthChecks;
-using SlimMessageBus.Host.CircuitBreaker.HealthCheck.Config;
+using SlimMessageBus.Host.CircuitBreaker.HealthCheck;
public static class Program
{
diff --git a/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs
index c784f1b9..1b978461 100644
--- a/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs
+++ b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs
@@ -13,7 +13,6 @@ public abstract class SqsBaseConsumer : AbstractConsumer
public SqsMessageBus MessageBus { get; }
protected IMessageProcessor MessageProcessor { get; }
- protected string Path { get; }
protected ISqsHeaderSerializer HeaderSerializer { get; }
protected SqsBaseConsumer(
@@ -23,11 +22,13 @@ protected SqsBaseConsumer(
IMessageProcessor messageProcessor,
IEnumerable consumerSettings,
ILogger logger)
- : base(logger, consumerSettings)
+ : base(logger,
+ consumerSettings,
+ path,
+ messageBus.Settings.ServiceProvider.GetServices())
{
- MessageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus));
_clientProvider = clientProvider ?? throw new ArgumentNullException(nameof(clientProvider));
- Path = path ?? throw new ArgumentNullException(nameof(path));
+ MessageBus = messageBus;
MessageProcessor = messageProcessor ?? throw new ArgumentNullException(nameof(messageProcessor));
HeaderSerializer = messageBus.HeaderSerializer;
T GetSingleValue(Func selector, string settingName, T defaultValue = default)
diff --git a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs
index a1bd113c..762ba2fd 100644
--- a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs
+++ b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs
@@ -3,6 +3,8 @@ namespace SlimMessageBus.Host.AzureEventHub;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
+using Microsoft.Extensions.DependencyInjection;
+
public class EhGroupConsumer : AbstractConsumer
{
private readonly EventProcessorClient _processorClient;
@@ -12,7 +14,10 @@ public class EhGroupConsumer : AbstractConsumer
public EventHubMessageBus MessageBus { get; }
public EhGroupConsumer(IEnumerable consumerSettings, EventHubMessageBus messageBus, GroupPath groupPath, Func partitionConsumerFactory)
- : base(messageBus.LoggerFactory.CreateLogger(), consumerSettings)
+ : base(messageBus.LoggerFactory.CreateLogger(),
+ consumerSettings,
+ groupPath.Path,
+ messageBus.Settings.ServiceProvider.GetServices())
{
_groupPath = groupPath ?? throw new ArgumentNullException(nameof(groupPath));
if (partitionConsumerFactory == null) throw new ArgumentNullException(nameof(partitionConsumerFactory));
diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs
index a601e50c..09a5db8f 100644
--- a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs
+++ b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs
@@ -1,5 +1,7 @@
namespace SlimMessageBus.Host.AzureServiceBus.Consumer;
+using Microsoft.Extensions.DependencyInjection;
+
public abstract class AsbBaseConsumer : AbstractConsumer
{
private ServiceBusProcessor _serviceBusProcessor;
@@ -9,11 +11,19 @@ public abstract class AsbBaseConsumer : AbstractConsumer
protected IMessageProcessor MessageProcessor { get; }
protected TopicSubscriptionParams TopicSubscription { get; }
- protected AsbBaseConsumer(ServiceBusMessageBus messageBus, ServiceBusClient serviceBusClient, TopicSubscriptionParams subscriptionFactoryParams, IMessageProcessor messageProcessor, IEnumerable consumerSettings, ILogger logger)
- : base(logger ?? throw new ArgumentNullException(nameof(logger)), consumerSettings)
+ protected AsbBaseConsumer(ServiceBusMessageBus messageBus,
+ ServiceBusClient serviceBusClient,
+ TopicSubscriptionParams subscriptionFactoryParams,
+ IMessageProcessor messageProcessor,
+ IEnumerable consumerSettings,
+ ILogger logger)
+ : base(logger ?? throw new ArgumentNullException(nameof(logger)),
+ consumerSettings,
+ subscriptionFactoryParams.ToString(),
+ messageBus.Settings.ServiceProvider.GetServices())
{
- MessageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus));
- TopicSubscription = subscriptionFactoryParams ?? throw new ArgumentNullException(nameof(subscriptionFactoryParams));
+ MessageBus = messageBus;
+ TopicSubscription = subscriptionFactoryParams;
MessageProcessor = messageProcessor ?? throw new ArgumentNullException(nameof(messageProcessor));
T GetSingleValue(Func selector, string settingName)
diff --git a/src/SlimMessageBus.Host.AzureServiceBus/TopicSubscriptionParams.cs b/src/SlimMessageBus.Host.AzureServiceBus/TopicSubscriptionParams.cs
index cf19244c..4cd056a4 100644
--- a/src/SlimMessageBus.Host.AzureServiceBus/TopicSubscriptionParams.cs
+++ b/src/SlimMessageBus.Host.AzureServiceBus/TopicSubscriptionParams.cs
@@ -1,15 +1,9 @@
namespace SlimMessageBus.Host.AzureServiceBus;
-public class TopicSubscriptionParams
+public class TopicSubscriptionParams(string path, string subscriptionName)
{
- public string Path { get; set; }
- public string SubscriptionName { get; set; }
-
- public TopicSubscriptionParams(string path, string subscriptionName)
- {
- Path = path;
- SubscriptionName = subscriptionName;
- }
+ public string Path { get; set; } = path;
+ public string SubscriptionName { get; set; } = subscriptionName;
public override string ToString()
=> SubscriptionName == null ? Path : $"{Path}/{SubscriptionName}";
diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerBuilderExtensions.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerBuilderExtensions.cs
index c721f5dc..cdb92ff0 100644
--- a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerBuilderExtensions.cs
+++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerBuilderExtensions.cs
@@ -1,6 +1,4 @@
-namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Config;
-
-using Microsoft.Extensions.DependencyInjection.Extensions;
+namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;
public static class ConsumerBuilderExtensions
{
@@ -32,16 +30,15 @@ public static T PauseOnDegradedHealthCheck(this T builder, params string[] ta
private static void RegisterHealthServices(AbstractConsumerBuilder builder)
{
- builder.ConsumerSettings.CircuitBreakers.TryAdd();
- builder.PostConfigurationActions.Add(
- services =>
- {
- services.TryAddSingleton();
- services.TryAddEnumerable(ServiceDescriptor.Singleton(sp => sp.GetRequiredService()));
- services.TryAdd(ServiceDescriptor.Singleton(sp => sp.GetRequiredService()));
- services.AddHostedService(sp => sp.GetRequiredService());
+ builder.AddConsumerCircuitBreakerType();
+ builder.PostConfigurationActions.Add(services =>
+ {
+ services.TryAddSingleton();
+ services.TryAddEnumerable(ServiceDescriptor.Singleton(sp => sp.GetRequiredService()));
+ services.TryAdd(ServiceDescriptor.Singleton(sp => sp.GetRequiredService()));
+ services.AddHostedService(sp => sp.GetRequiredService());
- services.TryAddSingleton();
- });
+ services.TryAddTransient();
+ });
}
}
diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/SettingsExtensions.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerSettingsExtensions.cs
similarity index 70%
rename from src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/SettingsExtensions.cs
rename to src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerSettingsExtensions.cs
index a2775f10..18bebac4 100644
--- a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/SettingsExtensions.cs
+++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerSettingsExtensions.cs
@@ -1,9 +1,7 @@
namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;
-static internal class SettingsExtensions
+static internal class ConsumerSettingsExtensions
{
- private const string _key = nameof(HealthCheckCircuitBreaker);
-
public static T PauseOnDegraded(this T consumerSettings, params string[] tags)
where T : AbstractConsumerSettings
{
@@ -15,7 +13,6 @@ public static T PauseOnDegraded(this T consumerSettings, params string[] tags
dict[tag] = HealthStatus.Degraded;
}
}
-
return consumerSettings;
}
@@ -30,18 +27,9 @@ public static T PauseOnUnhealthy(this T consumerSettings, params string[] tag
dict[tag] = HealthStatus.Unhealthy;
}
}
-
return consumerSettings;
}
- static internal IDictionary HealthBreakerTags(this AbstractConsumerSettings consumerSettings)
- {
- if (!consumerSettings.Properties.TryGetValue(_key, out var rawValue) || rawValue is not IDictionary value)
- {
- value = new Dictionary();
- consumerSettings.Properties[_key] = value;
- }
-
- return value;
- }
+ static internal IDictionary HealthBreakerTags(this AbstractConsumerSettings consumerSettings)
+ => consumerSettings.GetOrCreate(ConsumerSettingsProperties.HealthStatusTags, () => []);
}
diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerSettingsProperties.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerSettingsProperties.cs
new file mode 100644
index 00000000..a9fa2e91
--- /dev/null
+++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/Config/ConsumerSettingsProperties.cs
@@ -0,0 +1,6 @@
+namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;
+
+static internal class ConsumerSettingsProperties
+{
+ static readonly internal ProviderExtensionProperty> HealthStatusTags = new("CircuitBreaker_HealthStatusTags");
+}
diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/GlobalUsings.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/GlobalUsings.cs
index 6d9be2c3..af29883e 100644
--- a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/GlobalUsings.cs
+++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/GlobalUsings.cs
@@ -1,7 +1,7 @@
global using System;
global using System.Diagnostics;
-
+
global using Microsoft.Extensions.DependencyInjection;
+global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Diagnostics.HealthChecks;
global using Microsoft.Extensions.Hosting;
-global using Microsoft.Extensions.Logging;
diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckBackgroundService.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckBackgroundService.cs
index 96bd55a0..a2cd303d 100644
--- a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckBackgroundService.cs
+++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckBackgroundService.cs
@@ -38,10 +38,8 @@ public async Task PublishAsync(HealthReport report, CancellationToken cancellati
}
}
- public Task StartAsync(CancellationToken cancellationToken)
- {
- return Task.CompletedTask;
- }
+ public Task StartAsync(CancellationToken cancellationToken)
+ => Task.CompletedTask;
public Task StopAsync(CancellationToken cancellationToken)
{
diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckCircuitBreaker.cs b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckCircuitBreaker.cs
index 8797fc9e..52a5f33e 100644
--- a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckCircuitBreaker.cs
+++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/HealthCheckCircuitBreaker.cs
@@ -1,5 +1,7 @@
-namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;
-
+namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck;
+
+using SlimMessageBus.Host.CircuitBreaker;
+
internal sealed class HealthCheckCircuitBreaker : IConsumerCircuitBreaker
{
private readonly IEnumerable _settings;
diff --git a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj
index 611abab7..7f63ccfb 100644
--- a/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj
+++ b/src/SlimMessageBus.Host.CircuitBreaker.HealthCheck/SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj
@@ -15,16 +15,13 @@
+
-
- <_Parameter1>SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test
-
-
- <_Parameter1>DynamicProxyGenAssembly2
-
+
+
diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Circuit.cs b/src/SlimMessageBus.Host.CircuitBreaker/Circuit.cs
new file mode 100644
index 00000000..349530a4
--- /dev/null
+++ b/src/SlimMessageBus.Host.CircuitBreaker/Circuit.cs
@@ -0,0 +1,7 @@
+namespace SlimMessageBus.Host.CircuitBreaker;
+
+public enum Circuit
+{
+ Open,
+ Closed
+}
diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Config/ConsumerBuilderExtensions.cs b/src/SlimMessageBus.Host.CircuitBreaker/Config/ConsumerBuilderExtensions.cs
new file mode 100644
index 00000000..e1258a05
--- /dev/null
+++ b/src/SlimMessageBus.Host.CircuitBreaker/Config/ConsumerBuilderExtensions.cs
@@ -0,0 +1,26 @@
+namespace SlimMessageBus.Host.CircuitBreaker;
+
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+public static class ConsumerBuilderExtensions
+{
+ public static T AddConsumerCircuitBreakerType(this T builder)
+ where T : AbstractConsumerBuilder
+ where TConsumerCircuitBreaker : IConsumerCircuitBreaker
+ {
+ if (builder is null)
+ {
+ throw new ArgumentNullException(nameof(builder));
+ }
+
+ var breakersTypes = builder.ConsumerSettings.GetOrCreate(ConsumerSettingsProperties.CircuitBreakerTypes, () => []);
+ breakersTypes.TryAdd();
+
+ builder.PostConfigurationActions.Add(services =>
+ {
+ services.TryAddEnumerable(ServiceDescriptor.Singleton());
+ });
+
+ return builder;
+ }
+}
\ No newline at end of file
diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Config/ConsumerSettingsProperties.cs b/src/SlimMessageBus.Host.CircuitBreaker/Config/ConsumerSettingsProperties.cs
new file mode 100644
index 00000000..5e441547
--- /dev/null
+++ b/src/SlimMessageBus.Host.CircuitBreaker/Config/ConsumerSettingsProperties.cs
@@ -0,0 +1,9 @@
+namespace SlimMessageBus.Host.CircuitBreaker;
+
+static internal class ConsumerSettingsProperties
+{
+ ///
+ /// to be used with the consumer.
+ ///
+ static readonly internal ProviderExtensionProperty> CircuitBreakerTypes = new("CircuitBreaker_Types");
+}
diff --git a/src/SlimMessageBus.Host.CircuitBreaker/GlobalUsings.cs b/src/SlimMessageBus.Host.CircuitBreaker/GlobalUsings.cs
new file mode 100644
index 00000000..a4d02342
--- /dev/null
+++ b/src/SlimMessageBus.Host.CircuitBreaker/GlobalUsings.cs
@@ -0,0 +1,2 @@
+global using Microsoft.Extensions.DependencyInjection;
+global using Microsoft.Extensions.Logging;
diff --git a/src/SlimMessageBus/IConsumerCircuitBreaker.cs b/src/SlimMessageBus.Host.CircuitBreaker/IConsumerCircuitBreaker.cs
similarity index 76%
rename from src/SlimMessageBus/IConsumerCircuitBreaker.cs
rename to src/SlimMessageBus.Host.CircuitBreaker/IConsumerCircuitBreaker.cs
index 27154566..453d149b 100644
--- a/src/SlimMessageBus/IConsumerCircuitBreaker.cs
+++ b/src/SlimMessageBus.Host.CircuitBreaker/IConsumerCircuitBreaker.cs
@@ -1,5 +1,5 @@
-namespace SlimMessageBus;
-
+namespace SlimMessageBus.Host.CircuitBreaker;
+
///
/// Circuit breaker to toggle consumer status on an external event.
///
@@ -9,9 +9,3 @@ public interface IConsumerCircuitBreaker
Task Subscribe(Func onChange);
void Unsubscribe();
}
-
-public enum Circuit
-{
- Open,
- Closed
-}
diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Implementation/AbstractConsumerExtensions.cs b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/AbstractConsumerExtensions.cs
new file mode 100644
index 00000000..ebbc1b2f
--- /dev/null
+++ b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/AbstractConsumerExtensions.cs
@@ -0,0 +1,7 @@
+namespace SlimMessageBus.Host.CircuitBreaker;
+
+internal static class AbstractConsumerExtensions
+{
+ public static bool IsPaused(this AbstractConsumer consumer) => consumer.GetOrDefault(AbstractConsumerProperties.IsPaused, false);
+ public static void SetIsPaused(this AbstractConsumer consumer, bool isPaused) => AbstractConsumerProperties.IsPaused.Set(consumer, isPaused);
+}
\ No newline at end of file
diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Implementation/AbstractConsumerProperties.cs b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/AbstractConsumerProperties.cs
new file mode 100644
index 00000000..5daffa07
--- /dev/null
+++ b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/AbstractConsumerProperties.cs
@@ -0,0 +1,7 @@
+namespace SlimMessageBus.Host.CircuitBreaker;
+
+static internal class AbstractConsumerProperties
+{
+ static readonly internal ProviderExtensionProperty IsPaused = new("CircuitBreaker_IsPaused");
+ static readonly internal ProviderExtensionProperty?> Breakers = new("CircuitBreaker_Breakers");
+}
diff --git a/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs
new file mode 100644
index 00000000..30658f06
--- /dev/null
+++ b/src/SlimMessageBus.Host.CircuitBreaker/Implementation/CircuitBreakerAbstractConsumerInterceptor.cs
@@ -0,0 +1,92 @@
+namespace SlimMessageBus.Host.CircuitBreaker;
+
+///
+/// Circuit breaker to toggle consumer status on an external events.
+///
+internal sealed class CircuitBreakerAbstractConsumerInterceptor : IAbstractConsumerInterceptor
+{
+ public int Order => 100;
+
+ public async Task CanStart(AbstractConsumer consumer)
+ {
+ var breakerTypes = consumer.Settings.SelectMany(x => x.GetOrDefault(ConsumerSettingsProperties.CircuitBreakerTypes, [])).ToHashSet();
+ if (breakerTypes.Count == 0)
+ {
+ // no breakers, allow to pass
+ return true;
+ }
+
+ var breakers = consumer.GetOrCreate(AbstractConsumerProperties.Breakers, () => [])!;
+
+ async Task BreakerChanged(Circuit state)
+ {
+ if (!consumer.IsStarted)
+ {
+ return;
+ }
+
+ var isPaused = consumer.IsPaused();
+ var shouldPause = state == Circuit.Closed || breakers.Exists(x => x.State == Circuit.Closed);
+ if (shouldPause != isPaused)
+ {
+ var path = consumer.Path;
+ var bus = consumer.Settings[0].MessageBusSettings.Name ?? "default";
+ if (shouldPause)
+ {
+ consumer.Logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus);
+ await consumer.DoStop().ConfigureAwait(false);
+ }
+ else
+ {
+ consumer.Logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus);
+ await consumer.DoStart().ConfigureAwait(false);
+ }
+ consumer.SetIsPaused(shouldPause);
+ }
+ }
+
+ var sp = consumer.Settings.Select(x => x.MessageBusSettings.ServiceProvider).First(x => x != null);
+ foreach (var breakerType in breakerTypes)
+ {
+ var breaker = (IConsumerCircuitBreaker)ActivatorUtilities.CreateInstance(sp, breakerType, consumer.Settings);
+ breakers.Add(breaker);
+
+ await breaker.Subscribe(BreakerChanged);
+ }
+
+ var isPaused = breakers.Exists(x => x.State == Circuit.Closed);
+ consumer.SetIsPaused(isPaused);
+ return !isPaused;
+ }
+
+ public async Task CanStop(AbstractConsumer consumer)
+ {
+ var breakers = consumer.GetOrDefault(AbstractConsumerProperties.Breakers, null);
+ if (breakers == null || breakers.Count == 0)
+ {
+ // no breakers, allow to pass
+ return true;
+ }
+
+ foreach (var breaker in breakers)
+ {
+ breaker.Unsubscribe();
+
+ if (breaker is IAsyncDisposable asyncDisposable)
+ {
+ await asyncDisposable.DisposeAsync();
+ }
+ else if (breaker is IDisposable disposable)
+ {
+ disposable.Dispose();
+ }
+ }
+ breakers.Clear();
+
+ return !consumer.IsPaused();
+ }
+
+ public Task Started(AbstractConsumer consumer) => Task.CompletedTask;
+
+ public Task Stopped(AbstractConsumer consumer) => Task.CompletedTask;
+}
diff --git a/src/SlimMessageBus.Host.CircuitBreaker/SlimMessageBus.Host.CircuitBreaker.csproj b/src/SlimMessageBus.Host.CircuitBreaker/SlimMessageBus.Host.CircuitBreaker.csproj
new file mode 100644
index 00000000..5b7fe10c
--- /dev/null
+++ b/src/SlimMessageBus.Host.CircuitBreaker/SlimMessageBus.Host.CircuitBreaker.csproj
@@ -0,0 +1,22 @@
+
+
+
+
+
+ Circuit breaker abstractions for SlimMessageBus
+ Toggle consumer on or off
+ icon.png
+
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/SlimMessageBus.Host.Configuration/Settings/AbstractConsumerSettings.cs b/src/SlimMessageBus.Host.Configuration/Settings/AbstractConsumerSettings.cs
index a04fd491..71260121 100644
--- a/src/SlimMessageBus.Host.Configuration/Settings/AbstractConsumerSettings.cs
+++ b/src/SlimMessageBus.Host.Configuration/Settings/AbstractConsumerSettings.cs
@@ -23,11 +23,6 @@ public abstract class AbstractConsumerSettings : HasProviderExtensions
///
public int Instances { get; set; }
- ///
- /// to be used with the consumer.
- ///
- public TypeCollection CircuitBreakers { get; } = [];
-
protected AbstractConsumerSettings()
{
Instances = 1;
diff --git a/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj b/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj
index 36463945..b01a3b49 100644
--- a/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj
+++ b/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj
@@ -6,7 +6,7 @@
Core configuration interfaces of SlimMessageBus
SlimMessageBus
SlimMessageBus.Host
- 3.0.0-rc900
+ 3.0.0-rc901
diff --git a/src/SlimMessageBus.Host.Configuration/TypeCollection.cs b/src/SlimMessageBus.Host.Configuration/TypeCollection.cs
index 04246947..f936bcfb 100644
--- a/src/SlimMessageBus.Host.Configuration/TypeCollection.cs
+++ b/src/SlimMessageBus.Host.Configuration/TypeCollection.cs
@@ -45,34 +45,25 @@ public bool TryAdd() where T : TInterface
public void Clear() => _innerList.Clear();
- public bool Contains() where T : TInterface
- {
- return _innerList.Contains(typeof(T));
- }
+ public bool Contains() where T : TInterface
+ => _innerList.Contains(typeof(T));
- public void CopyTo(Type[] array, int arrayIndex) => _innerList.CopyTo(array, arrayIndex);
+ public void CopyTo(Type[] array, int arrayIndex)
+ => _innerList.CopyTo(array, arrayIndex);
- public bool Remove() where T : TInterface
- {
- return _innerList.Remove(typeof(T));
- }
+ public bool Remove() where T : TInterface
+ => _innerList.Remove(typeof(T));
- public bool Remove(Type type)
- {
- return _innerList.Remove(type);
- }
+ public bool Remove(Type type)
+ => _innerList.Remove(type);
public int Count => _innerList.Count;
public bool IsReadOnly => false;
- public IEnumerator GetEnumerator()
- {
- return _innerList.GetEnumerator();
- }
+ public IEnumerator GetEnumerator()
+ => _innerList.GetEnumerator();
- IEnumerator IEnumerable.GetEnumerator()
- {
- return _innerList.GetEnumerator();
- }
+ IEnumerator IEnumerable.GetEnumerator()
+ => _innerList.GetEnumerator();
}
\ No newline at end of file
diff --git a/src/SlimMessageBus.Host.Interceptor/SlimMessageBus.Host.Interceptor.csproj b/src/SlimMessageBus.Host.Interceptor/SlimMessageBus.Host.Interceptor.csproj
index 97abd9cc..b98b06a5 100644
--- a/src/SlimMessageBus.Host.Interceptor/SlimMessageBus.Host.Interceptor.csproj
+++ b/src/SlimMessageBus.Host.Interceptor/SlimMessageBus.Host.Interceptor.csproj
@@ -3,7 +3,7 @@
- 3.0.0-rc900
+ 3.0.0-rc901
Core interceptor interfaces of SlimMessageBus
SlimMessageBus
diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
index cfb70f0f..13278bf0 100644
--- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
+++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
@@ -15,8 +15,17 @@ public class KafkaGroupConsumer : AbstractConsumer, IKafkaCommitController
public string Group { get; }
public IReadOnlyCollection Topics { get; }
- public KafkaGroupConsumer(ILoggerFactory loggerFactory, KafkaMessageBusSettings providerSettings, IEnumerable consumerSettings, string group, IReadOnlyCollection topics, Func processorFactory)
- : base(loggerFactory.CreateLogger(), consumerSettings)
+ public KafkaGroupConsumer(ILoggerFactory loggerFactory,
+ KafkaMessageBusSettings providerSettings,
+ IEnumerable consumerSettings,
+ IEnumerable interceptors,
+ string group,
+ IReadOnlyCollection topics,
+ Func processorFactory)
+ : base(loggerFactory.CreateLogger(),
+ consumerSettings,
+ group,
+ interceptors)
{
ProviderSettings = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings));
Group = group ?? throw new ArgumentNullException(nameof(group));
diff --git a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
index 1c1f68ba..5bc1d1aa 100644
--- a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
+++ b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
@@ -1,5 +1,7 @@
namespace SlimMessageBus.Host.Kafka;
+using Microsoft.Extensions.DependencyInjection;
+
using IProducer = Confluent.Kafka.IProducer;
using Message = Confluent.Kafka.Message;
@@ -64,7 +66,7 @@ protected override async Task CreateConsumers()
void AddGroupConsumer(IEnumerable consumerSettings, string group, IReadOnlyCollection topics, Func processorFactory)
{
_logger.LogInformation("Creating consumer group {ConsumerGroup}", group);
- AddConsumer(new KafkaGroupConsumer(LoggerFactory, ProviderSettings, consumerSettings, group, topics, processorFactory));
+ AddConsumer(new KafkaGroupConsumer(LoggerFactory, ProviderSettings, consumerSettings, interceptors: Settings.ServiceProvider.GetServices(), group, topics, processorFactory));
}
object MessageProvider(Type messageType, ConsumeResult transportMessage)
diff --git a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
index 102ee6d5..2002f298 100644
--- a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
+++ b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
@@ -3,6 +3,8 @@
using System.Collections.Generic;
using System.Threading;
+using Microsoft.Extensions.DependencyInjection;
+
using MQTTnet.Extensions.ManagedClient;
public class MqttMessageBus : MessageBusBase
@@ -55,7 +57,7 @@ protected override async Task CreateConsumers()
void AddTopicConsumer(IEnumerable consumerSettings, string topic, IMessageProcessor messageProcessor)
{
_logger.LogInformation("Creating consumer for {Path}", topic);
- var consumer = new MqttTopicConsumer(LoggerFactory.CreateLogger(), consumerSettings, topic, messageProcessor);
+ var consumer = new MqttTopicConsumer(LoggerFactory.CreateLogger(), consumerSettings, interceptors: Settings.ServiceProvider.GetServices(), topic, messageProcessor);
AddConsumer(consumer);
}
@@ -84,7 +86,7 @@ void AddTopicConsumer(IEnumerable consumerSettings, st
AddTopicConsumer([Settings.RequestResponse], Settings.RequestResponse.Path, processor);
}
- var topics = Consumers.Cast().Select(x => new MqttTopicFilterBuilder().WithTopic(x.Topic).Build()).ToList();
+ var topics = Consumers.Cast().Select(x => new MqttTopicFilterBuilder().WithTopic(x.Path).Build()).ToList();
await _mqttClient.SubscribeAsync(topics).ConfigureAwait(false);
}
@@ -101,7 +103,7 @@ protected override async Task DestroyConsumers()
private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
- var consumer = Consumers.Cast().FirstOrDefault(x => x.Topic == arg.ApplicationMessage.Topic);
+ var consumer = Consumers.Cast().FirstOrDefault(x => x.Path == arg.ApplicationMessage.Topic);
if (consumer != null)
{
var headers = new Dictionary();
diff --git a/src/SlimMessageBus.Host.Mqtt/MqttTopicConsumer.cs b/src/SlimMessageBus.Host.Mqtt/MqttTopicConsumer.cs
index 7720782c..738abb8b 100644
--- a/src/SlimMessageBus.Host.Mqtt/MqttTopicConsumer.cs
+++ b/src/SlimMessageBus.Host.Mqtt/MqttTopicConsumer.cs
@@ -3,12 +3,17 @@
public class MqttTopicConsumer : AbstractConsumer
{
public IMessageProcessor MessageProcessor { get; }
- public string Topic { get; }
- public MqttTopicConsumer(ILogger logger, IEnumerable consumerSettings, string topic, IMessageProcessor messageProcessor)
- : base(logger, consumerSettings)
+ public MqttTopicConsumer(ILogger logger,
+ IEnumerable consumerSettings,
+ IEnumerable interceptors,
+ string topic,
+ IMessageProcessor messageProcessor)
+ : base(logger,
+ consumerSettings,
+ topic,
+ interceptors)
{
- Topic = topic;
MessageProcessor = messageProcessor;
}
diff --git a/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs b/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs
index ec81318e..4a9681a6 100644
--- a/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs
+++ b/src/SlimMessageBus.Host.Nats/NatsMessageBus.cs
@@ -1,5 +1,6 @@
namespace SlimMessageBus.Host.Nats;
+using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Primitives;
public class NatsMessageBus : MessageBusBase
@@ -78,7 +79,7 @@ protected override async Task CreateConsumers()
private void AddSubjectConsumer(IEnumerable consumerSettings, string subject, IMessageProcessor> processor)
{
_logger.LogInformation("Creating consumer for {Subject}", subject);
- var consumer = new NatsSubjectConsumer(LoggerFactory.CreateLogger>(), consumerSettings, subject, _connection, processor);
+ var consumer = new NatsSubjectConsumer(LoggerFactory.CreateLogger>(), consumerSettings, interceptors: Settings.ServiceProvider.GetServices(), subject, _connection, processor);
AddConsumer(consumer);
}
diff --git a/src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs b/src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs
index 43a9f2cf..625dec57 100644
--- a/src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs
+++ b/src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs
@@ -1,27 +1,31 @@
#nullable enable
namespace SlimMessageBus.Host.Nats;
-using System.Collections.Generic;
-
public class NatsSubjectConsumer : AbstractConsumer
{
- private readonly string _subject;
private readonly INatsConnection _connection;
private readonly IMessageProcessor> _messageProcessor;
private INatsSub? _subscription;
private Task? _messageConsumerTask;
- public NatsSubjectConsumer(ILogger logger, IEnumerable consumerSettings, string subject, INatsConnection connection, IMessageProcessor> messageProcessor)
- : base(logger, consumerSettings)
+ public NatsSubjectConsumer(ILogger logger,
+ IEnumerable consumerSettings,
+ IEnumerable interceptors,
+ string subject,
+ INatsConnection connection,
+ IMessageProcessor> messageProcessor)
+ : base(logger,
+ consumerSettings,
+ path: subject,
+ interceptors)
{
- _subject = subject;
_connection = connection;
_messageProcessor = messageProcessor;
}
protected override async Task OnStart()
{
- _subscription ??= await _connection.SubscribeCoreAsync(_subject, cancellationToken: CancellationToken);
+ _subscription ??= await _connection.SubscribeCoreAsync(Path, cancellationToken: CancellationToken);
_messageConsumerTask = Task.Factory.StartNew(OnLoop, CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
}
diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs
index 47e4e630..05c40cc5 100644
--- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs
+++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/AbstractRabbitMqConsumer.cs
@@ -10,15 +10,21 @@ public abstract class AbstractRabbitMqConsumer : AbstractConsumer
private AsyncEventingBasicConsumer _consumer;
private string _consumerTag;
- public string QueueName { get; }
protected abstract RabbitMqMessageAcknowledgementMode AcknowledgementMode { get; }
- protected AbstractRabbitMqConsumer(ILogger logger, IEnumerable consumerSettings, IRabbitMqChannel channel, string queueName, IHeaderValueConverter headerValueConverter)
- : base(logger, consumerSettings)
+ protected AbstractRabbitMqConsumer(ILogger logger,
+ IEnumerable consumerSettings,
+ IEnumerable interceptors,
+ IRabbitMqChannel channel,
+ string queueName,
+ IHeaderValueConverter headerValueConverter)
+ : base(logger,
+ consumerSettings,
+ path: queueName,
+ interceptors)
{
_channel = channel;
_headerValueConverter = headerValueConverter;
- QueueName = queueName;
}
protected override Task OnStart()
@@ -28,7 +34,7 @@ protected override Task OnStart()
lock (_channel.ChannelLock)
{
- _consumerTag = _channel.Channel.BasicConsume(QueueName, autoAck: AcknowledgementMode == RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit, _consumer);
+ _consumerTag = _channel.Channel.BasicConsume(Path, autoAck: AcknowledgementMode == RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit, _consumer);
}
return Task.CompletedTask;
@@ -54,7 +60,7 @@ protected async Task OnMessageReceived(object sender, BasicDeliverEventArgs @eve
return;
}
- Logger.LogDebug("Message arrived on queue {QueueName} from exchange {ExchangeName} with delivery tag {DeliveryTag}", QueueName, @event.Exchange, @event.DeliveryTag);
+ Logger.LogDebug("Message arrived on queue {QueueName} from exchange {ExchangeName} with delivery tag {DeliveryTag}", Path, @event.Exchange, @event.DeliveryTag);
Exception exception;
try
{
@@ -76,7 +82,7 @@ protected async Task OnMessageReceived(object sender, BasicDeliverEventArgs @eve
}
if (exception != null)
{
- Logger.LogError(exception, "Error while processing message on queue {QueueName} from exchange {ExchangeName}: {ErrorMessage}", QueueName, @event.Exchange, exception.Message);
+ Logger.LogError(exception, "Error while processing message on queue {QueueName} from exchange {ExchangeName}: {ErrorMessage}", Path, @event.Exchange, exception.Message);
}
}
diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs
index f2f18de2..f12ad27a 100644
--- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs
+++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs
@@ -56,7 +56,7 @@ public async Task ProcessMessage(BasicDeliverEventArgs tra
if (r.Exception != null)
{
// We rely on the IMessageProcessor to execute the ConsumerErrorHandler, but if it's not registered in the DI, it fails, or there is another fatal error then the message will be lost.
- _logger.LogError(r.Exception, "Exchange {Exchange} - Queue {Queue}: Error processing message {Message}, delivery tag {DeliveryTag}", transportMessage.Exchange, _consumer.QueueName, transportMessage, transportMessage.DeliveryTag);
+ _logger.LogError(r.Exception, "Exchange {Exchange} - Queue {Queue}: Error processing message {Message}, delivery tag {DeliveryTag}", transportMessage.Exchange, _consumer.Path, transportMessage, transportMessage.DeliveryTag);
}
return r;
}
diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs
index e9018b58..f6ed3ec5 100644
--- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs
+++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs
@@ -1,8 +1,10 @@
namespace SlimMessageBus.Host.RabbitMQ;
+using Microsoft.Extensions.DependencyInjection;
+
public interface IRabbitMqConsumer
{
- string QueueName { get; }
+ string Path { get; }
void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessageConfirmOptions option, IDictionary properties, bool warnIfAlreadyConfirmed = false);
}
@@ -24,7 +26,12 @@ public RabbitMqConsumer(
MessageBusBase messageBus,
MessageProvider messageProvider,
IHeaderValueConverter headerValueConverter)
- : base(loggerFactory.CreateLogger(), consumers, channel, queueName, headerValueConverter)
+ : base(loggerFactory.CreateLogger(),
+ consumers,
+ messageBus.Settings.ServiceProvider.GetServices(),
+ channel,
+ queueName,
+ headerValueConverter)
{
_acknowledgementMode = consumers.Select(x => x.GetOrDefault(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null)
?? RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; // be default choose the safer acknowledgement mode
@@ -99,7 +106,7 @@ public void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessa
// Note: We want to makes sure the 1st message confirmation is handled
if (warnIfAlreadyConfirmed)
{
- Logger.LogWarning("Exchange {Exchange} - Queue {Queue}: The message (delivery tag {MessageDeliveryTag}) was already confirmed, subsequent message confirmation will have no effect", transportMessage.Exchange, QueueName, transportMessage.DeliveryTag);
+ Logger.LogWarning("Exchange {Exchange} - Queue {Queue}: The message (delivery tag {MessageDeliveryTag}) was already confirmed, subsequent message confirmation will have no effect", transportMessage.Exchange, Path, transportMessage.DeliveryTag);
}
return;
}
@@ -139,7 +146,7 @@ protected override async Task OnMessageReceived(Dictionary _messageProcessor;
@@ -8,6 +10,7 @@ public class RabbitMqResponseConsumer : AbstractRabbitMqConsumer
public RabbitMqResponseConsumer(
ILoggerFactory loggerFactory,
+ IEnumerable interceptors,
IRabbitMqChannel channel,
string queueName,
RequestResponseSettings requestResponseSettings,
@@ -15,7 +18,13 @@ public RabbitMqResponseConsumer(
IPendingRequestStore pendingRequestStore,
ICurrentTimeProvider currentTimeProvider,
IHeaderValueConverter headerValueConverter)
- : base(loggerFactory.CreateLogger(), [requestResponseSettings], channel, queueName, headerValueConverter)
+
+ : base(loggerFactory.CreateLogger(),
+ [requestResponseSettings],
+ interceptors,
+ channel,
+ queueName,
+ headerValueConverter)
{
_messageProcessor = new ResponseMessageProcessor(loggerFactory, requestResponseSettings, messageProvider, pendingRequestStore, currentTimeProvider);
}
diff --git a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs
index 5a354453..2e5d034c 100644
--- a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs
+++ b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs
@@ -1,4 +1,7 @@
namespace SlimMessageBus.Host.RabbitMQ;
+
+using Microsoft.Extensions.DependencyInjection;
+
public class RabbitMqMessageBus : MessageBusBase, IRabbitMqChannel
{
private readonly ILogger _logger;
@@ -51,6 +54,7 @@ protected override async Task CreateConsumers()
if (Settings.RequestResponse != null)
{
AddConsumer(new RabbitMqResponseConsumer(LoggerFactory,
+ interceptors: Settings.ServiceProvider.GetServices(),
channel: this,
queueName: Settings.RequestResponse.GetQueueName(),
Settings.RequestResponse,
diff --git a/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs b/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs
index 8105bb1b..466b772a 100644
--- a/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs
+++ b/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs
@@ -23,8 +23,17 @@ public QueueProcessors(string name, List>
}
}
- public RedisListCheckerConsumer(ILogger logger, IDatabase database, TimeSpan? pollDelay, TimeSpan maxIdle, IEnumerable<(string QueueName, IMessageProcessor Processor)> queues, IMessageSerializer envelopeSerializer)
- : base(logger, [])
+ public RedisListCheckerConsumer(ILogger logger,
+ IEnumerable interceptors,
+ IDatabase database,
+ TimeSpan? pollDelay,
+ TimeSpan maxIdle,
+ IEnumerable<(string QueueName, IMessageProcessor Processor)> queues,
+ IMessageSerializer envelopeSerializer)
+ : base(logger,
+ [],
+ path: string.Join("|", queues.Select(x => x.QueueName)),
+ interceptors)
{
_database = database;
_pollDelay = pollDelay;
diff --git a/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs b/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs
index ad79f383..a336aee4 100644
--- a/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs
+++ b/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs
@@ -7,12 +7,18 @@ public class RedisTopicConsumer : AbstractConsumer, IRedisConsumer
private ChannelMessageQueue _channelMessageQueue;
private readonly IMessageProcessor _messageProcessor;
- public string Path { get; }
-
- public RedisTopicConsumer(ILogger logger, IEnumerable consumerSettings, string topic, ISubscriber subscriber, IMessageProcessor messageProcessor, IMessageSerializer envelopeSerializer)
- : base(logger, consumerSettings)
+ public RedisTopicConsumer(ILogger logger,
+ IEnumerable consumerSettings,
+ IEnumerable interceptors,
+ string topic,
+ ISubscriber subscriber,
+ IMessageProcessor messageProcessor,
+ IMessageSerializer envelopeSerializer)
+ : base(logger,
+ consumerSettings,
+ path: topic,
+ interceptors)
{
- Path = topic;
_messageProcessor = messageProcessor;
_envelopeSerializer = envelopeSerializer;
_subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber));
diff --git a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs
index 1802eb3a..d7016aec 100644
--- a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs
+++ b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs
@@ -1,5 +1,7 @@
namespace SlimMessageBus.Host.Redis;
+using Microsoft.Extensions.DependencyInjection;
+
public class RedisMessageBus : MessageBusBase
{
private readonly ILogger _logger;
@@ -95,6 +97,7 @@ void AddTopicConsumer(IEnumerable consumerSettings, st
var consumer = new RedisTopicConsumer(
LoggerFactory.CreateLogger(),
consumerSettings,
+ interceptors: Settings.ServiceProvider.GetServices(),
topic,
subscriber,
messageProcessor,
@@ -152,7 +155,7 @@ void AddTopicConsumer(IEnumerable consumerSettings, st
if (queues.Count > 0)
{
- AddConsumer(new RedisListCheckerConsumer(LoggerFactory.CreateLogger(), Database, ProviderSettings.QueuePollDelay, ProviderSettings.QueuePollMaxIdle, queues, ProviderSettings.EnvelopeSerializer));
+ AddConsumer(new RedisListCheckerConsumer(LoggerFactory.CreateLogger(), Settings.ServiceProvider.GetServices(), Database, ProviderSettings.QueuePollDelay, ProviderSettings.QueuePollMaxIdle, queues, ProviderSettings.EnvelopeSerializer));
}
}
diff --git a/src/SlimMessageBus.Host.Serialization/SlimMessageBus.Host.Serialization.csproj b/src/SlimMessageBus.Host.Serialization/SlimMessageBus.Host.Serialization.csproj
index 16df001a..4a23fc6e 100644
--- a/src/SlimMessageBus.Host.Serialization/SlimMessageBus.Host.Serialization.csproj
+++ b/src/SlimMessageBus.Host.Serialization/SlimMessageBus.Host.Serialization.csproj
@@ -3,7 +3,7 @@
- 3.0.0-rc900
+ 3.0.0-rc901
Core serialization interfaces of SlimMessageBus
SlimMessageBus
diff --git a/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs b/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs
index 28745d69..0106d1e1 100644
--- a/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs
+++ b/src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs
@@ -1,48 +1,99 @@
namespace SlimMessageBus.Host;
-public abstract class AbstractConsumer : IAsyncDisposable, IConsumerControl
+public abstract class AbstractConsumer : HasProviderExtensions, IAsyncDisposable, IConsumerControl
{
private readonly SemaphoreSlim _semaphore;
- private readonly List _circuitBreakers;
-
+ private readonly IReadOnlyList _interceptors;
private CancellationTokenSource _cancellationTokenSource;
private bool _starting;
private bool _stopping;
- public bool IsPaused { get; private set; }
public bool IsStarted { get; private set; }
- protected ILogger Logger { get; }
- protected IReadOnlyList Settings { get; }
+ public string Path { get; }
+ public ILogger Logger { get; }
+ public IReadOnlyList Settings { get; }
protected CancellationToken CancellationToken => _cancellationTokenSource.Token;
- protected AbstractConsumer(ILogger logger, IEnumerable consumerSettings)
+ protected AbstractConsumer(ILogger logger,
+ IEnumerable consumerSettings,
+ string path,
+ IEnumerable interceptors)
{
_semaphore = new(1, 1);
- _circuitBreakers = [];
-
+ _interceptors = [.. interceptors.OrderBy(x => x.Order)];
Logger = logger;
- Settings = consumerSettings.ToList();
+ Settings = [.. consumerSettings];
+ Path = path;
}
- public async Task Start()
+ private async Task CallInterceptor(Func> func)
{
- async Task StartCircuitBreakers()
+ foreach (var interceptor in _interceptors)
{
- var types = Settings.SelectMany(x => x.CircuitBreakers).Distinct();
- if (!types.Any())
+ try
{
- return;
+ if (!await func(interceptor))
+ {
+ return false;
+ }
}
-
- var sp = Settings.Select(x => x.MessageBusSettings.ServiceProvider).FirstOrDefault(x => x != null);
- foreach (var type in types.Distinct())
+ catch (Exception e)
{
- var breaker = (IConsumerCircuitBreaker)ActivatorUtilities.CreateInstance(sp, type, Settings);
- _circuitBreakers.Add(breaker);
- await breaker.Subscribe(BreakerChanged);
+ Logger.LogError(e, "Interceptor {Interceptor} failed with error: {Error}", interceptor.GetType().Name, e.Message);
}
}
+ return true;
+ }
+
+ ///
+ /// Starts the underyling transport consumer (synchronized).
+ ///
+ ///
+ public async Task DoStart()
+ {
+ await _semaphore.WaitAsync();
+ try
+ {
+ await InternalOnStart();
+ }
+ finally
+ {
+ _semaphore.Release();
+ }
+ }
+ private async Task InternalOnStart()
+ {
+ await OnStart();
+ await CallInterceptor(async x => { await x.Started(this); return true; });
+ }
+
+ private async Task InternalOnStop()
+ {
+ await OnStop().ConfigureAwait(false);
+ await CallInterceptor(async x => { await x.Stopped(this); return true; });
+ }
+
+ ///
+ /// Stops the underyling transport consumer (synchronized).
+ ///
+ ///
+ public async Task DoStop()
+ {
+ await _semaphore.WaitAsync().ConfigureAwait(false);
+ try
+ {
+ await OnStop().ConfigureAwait(false);
+ await CallInterceptor(async x => { await x.Stopped(this); return true; });
+ }
+ finally
+ {
+ _semaphore.Release();
+ }
+ }
+
+ public async Task Start()
+ {
if (IsStarted || _starting)
{
return;
@@ -58,11 +109,9 @@ async Task StartCircuitBreakers()
_cancellationTokenSource = new CancellationTokenSource();
}
- await StartCircuitBreakers();
- IsPaused = _circuitBreakers.Exists(x => x.State == Circuit.Closed);
- if (!IsPaused)
+ if (await CallInterceptor(x => x.CanStart(this)))
{
- await OnStart().ConfigureAwait(false);
+ await InternalOnStart();
}
IsStarted = true;
@@ -76,25 +125,6 @@ async Task StartCircuitBreakers()
public async Task Stop()
{
- async Task StopCircuitBreakers()
- {
- foreach (var breaker in _circuitBreakers)
- {
- breaker.Unsubscribe();
-
- if (breaker is IAsyncDisposable asyncDisposable)
- {
- await asyncDisposable.DisposeAsync();
- }
- else if (breaker is IDisposable disposable)
- {
- disposable.Dispose();
- }
- }
-
- _circuitBreakers.Clear();
- }
-
if (!IsStarted || _stopping)
{
return;
@@ -106,10 +136,9 @@ async Task StopCircuitBreakers()
{
await _cancellationTokenSource.CancelAsync();
- await StopCircuitBreakers();
- if (!IsPaused)
+ if (await CallInterceptor(x => x.CanStop(this)))
{
- await OnStop().ConfigureAwait(false);
+ await InternalOnStop();
}
IsStarted = false;
@@ -121,8 +150,17 @@ async Task StopCircuitBreakers()
}
}
- protected abstract Task OnStart();
- protected abstract Task OnStop();
+ ///
+ /// Initializes the transport specific consumer loop after the consumer has been started.
+ ///
+ ///
+ internal protected abstract Task OnStart();
+
+ ///
+ /// Destroys the transport specific consumer loop before the consumer is stopped.
+ ///
+ ///
+ internal protected abstract Task OnStop();
#region IAsyncDisposable
@@ -141,40 +179,4 @@ protected async virtual ValueTask DisposeAsyncCore()
}
#endregion
-
- async internal Task BreakerChanged(Circuit state)
- {
- await _semaphore.WaitAsync();
- try
- {
- if (!IsStarted)
- {
- return;
- }
-
- var shouldPause = state == Circuit.Closed || _circuitBreakers.Exists(x => x.State == Circuit.Closed);
- if (shouldPause != IsPaused)
- {
- var settings = Settings.Count > 0 ? Settings[0] : null;
- var path = settings?.Path ?? "[unknown path]";
- var bus = settings?.MessageBusSettings?.Name ?? "default";
- if (shouldPause)
- {
- Logger.LogWarning("Circuit breaker tripped for '{Path}' on '{Bus}' bus. Consumer paused.", path, bus);
- await OnStop().ConfigureAwait(false);
- }
- else
- {
- Logger.LogInformation("Circuit breaker restored for '{Path}' on '{Bus}' bus. Consumer resumed.", path, bus);
- await OnStart().ConfigureAwait(false);
- }
-
- IsPaused = shouldPause;
- }
- }
- finally
- {
- _semaphore.Release();
- }
- }
}
diff --git a/src/SlimMessageBus.Host/Consumer/IAbstractConsumerInterceptor.cs b/src/SlimMessageBus.Host/Consumer/IAbstractConsumerInterceptor.cs
new file mode 100644
index 00000000..953be4d9
--- /dev/null
+++ b/src/SlimMessageBus.Host/Consumer/IAbstractConsumerInterceptor.cs
@@ -0,0 +1,33 @@
+namespace SlimMessageBus.Host;
+
+///
+/// Interceptor for consumers that are of type .
+///
+public interface IAbstractConsumerInterceptor : IInterceptorWithOrder
+{
+ ///
+ /// Called to check if the consumer can be started.
+ ///
+ ///
+ /// True if the start is allowed
+ Task CanStart(AbstractConsumer consumer);
+
+ ///
+ /// Called to check if the consumer can be stopped.
+ ///
+ ///
+ /// True if the stop is allowed
+ Task CanStop(AbstractConsumer consumer);
+
+ ///
+ /// Called when the consumer is started.
+ ///
+ ///
+ Task Started(AbstractConsumer consumer);
+
+ ///
+ /// Called when the consumer is stopped.
+ ///
+ ///
+ Task Stopped(AbstractConsumer consumer);
+}
diff --git a/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs b/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs
index 7378e100..8e3a4ae8 100644
--- a/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs
+++ b/src/SlimMessageBus.Host/DependencyResolver/ServiceCollectionExtensions.cs
@@ -26,7 +26,7 @@ public static IServiceCollection AddSlimMessageBus(this IServiceCollection servi
configure(mbb);
// Execute post config actions for the master bus and its children
- foreach (var postConfigure in mbb.PostConfigurationActions.Concat(mbb.Children.Values.SelectMany(x => x.PostConfigurationActions)))
+ foreach (var postConfigure in mbb.GetPostConfigurationActions())
{
postConfigure(services);
}
diff --git a/src/SlimMessageBus.Host/SlimMessageBus.Host.csproj b/src/SlimMessageBus.Host/SlimMessageBus.Host.csproj
index 1bbb7a7a..b07b9317 100644
--- a/src/SlimMessageBus.Host/SlimMessageBus.Host.csproj
+++ b/src/SlimMessageBus.Host/SlimMessageBus.Host.csproj
@@ -28,7 +28,7 @@
-
+
diff --git a/src/SlimMessageBus.sln b/src/SlimMessageBus.sln
index 7c230bbb..ae2b40a4 100644
--- a/src/SlimMessageBus.sln
+++ b/src/SlimMessageBus.sln
@@ -259,7 +259,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.Circuit
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test", "Tests\SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test\SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj", "{CA02D82E-DACC-4AB5-BD6B-071341E9E664}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.CircuitBreaker.HealthCheck", "Samples\Sample.CircuitBreaker.HealthCheck\Sample.CircuitBreaker.HealthCheck.csproj", "{226FC4F3-01EF-4214-9566-942CA0FB71B0}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.CircuitBreaker.HealthCheck", "Samples\Sample.CircuitBreaker.HealthCheck\Sample.CircuitBreaker.HealthCheck.csproj", "{226FC4F3-01EF-4214-9566-942CA0FB71B0}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.Outbox.Sql.Test", "Tests\SlimMessageBus.Host.Outbox.Sql.Test\SlimMessageBus.Host.Outbox.Sql.Test.csproj", "{CDF578D6-FE85-4A44-A99A-32490F047FDA}"
EndProject
@@ -282,6 +282,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.AmazonS
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.AmazonSQS.Test", "Tests\SlimMessageBus.Host.AmazonSQS.Test\SlimMessageBus.Host.AmazonSQS.Test.csproj", "{9255A33D-9697-4E69-9418-AD31656FF8AC}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.CircuitBreaker", "SlimMessageBus.Host.CircuitBreaker\SlimMessageBus.Host.CircuitBreaker.csproj", "{2FC8813B-D882-4B08-A886-5C6C6F8CB334}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.CircuitBreaker.Test", "Tests\SlimMessageBus.Host.CircuitBreaker.Test\SlimMessageBus.Host.CircuitBreaker.Test.csproj", "{B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -890,6 +894,22 @@ Global
{9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|Any CPU.Build.0 = Release|Any CPU
{9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|x86.ActiveCfg = Release|Any CPU
{9255A33D-9697-4E69-9418-AD31656FF8AC}.Release|x86.Build.0 = Release|Any CPU
+ {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Debug|x86.Build.0 = Debug|Any CPU
+ {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Release|Any CPU.Build.0 = Release|Any CPU
+ {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Release|x86.ActiveCfg = Release|Any CPU
+ {2FC8813B-D882-4B08-A886-5C6C6F8CB334}.Release|x86.Build.0 = Release|Any CPU
+ {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Debug|x86.Build.0 = Debug|Any CPU
+ {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Release|x86.ActiveCfg = Release|Any CPU
+ {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -982,6 +1002,8 @@ Global
{9FCBF788-1F0C-43E2-909D-1F96B2685F38} = {9F005B5C-A856-4351-8C0C-47A8B785C637}
{4DF4BC7C-5EE3-4310-BC40-054C1494444E} = {9291D340-B4FA-44A3-8060-C14743FB1712}
{9255A33D-9697-4E69-9418-AD31656FF8AC} = {9F005B5C-A856-4351-8C0C-47A8B785C637}
+ {2FC8813B-D882-4B08-A886-5C6C6F8CB334} = {FE36338C-0DA2-499E-92CA-F9D5CE594B9F}
+ {B05BA0C5-8E47-4361-8C62-BC6B8682B7AA} = {9F005B5C-A856-4351-8C0C-47A8B785C637}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {435A0D65-610C-4B84-B1AA-2C7FBE72DB80}
diff --git a/src/SlimMessageBus/SlimMessageBus.csproj b/src/SlimMessageBus/SlimMessageBus.csproj
index c8a0c17f..569adb31 100644
--- a/src/SlimMessageBus/SlimMessageBus.csproj
+++ b/src/SlimMessageBus/SlimMessageBus.csproj
@@ -3,7 +3,7 @@
- 3.0.0-rc900
+ 3.0.0-rc902
This library provides a lightweight, easy-to-use message bus interface for .NET, offering a simplified facade for working with messaging brokers.
It supports multiple transport providers for popular messaging systems, as well as in-memory (in-process) messaging for efficient local communication.
diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/GlobalUsings.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/GlobalUsings.cs
index 9434d91d..6bd34e2a 100644
--- a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/GlobalUsings.cs
+++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/GlobalUsings.cs
@@ -1,12 +1,7 @@
-global using System;
-global using System.Collections.Generic;
-global using System.Threading.Tasks;
-
-global using FluentAssertions;
-
-global using Microsoft.Extensions.Diagnostics.HealthChecks;
-global using Microsoft.Extensions.Logging.Abstractions;
-
-global using Moq;
-
+global using FluentAssertions;
+
+global using Microsoft.Extensions.Diagnostics.HealthChecks;
+
+global using Moq;
+
global using Xunit;
diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckBackgroundServiceTests.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckBackgroundServiceTests.cs
index 35986e86..b83e0ab3 100644
--- a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckBackgroundServiceTests.cs
+++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckBackgroundServiceTests.cs
@@ -1,5 +1,5 @@
-namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test;
-
+namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test;
+
public static class HealthCheckBackgroundServiceTests
{
public class AreEqualTests
diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckCircuitBreakerTests.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckCircuitBreakerTests.cs
index 48499879..d7ce443a 100644
--- a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckCircuitBreakerTests.cs
+++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/HealthCheckCircuitBreakerTests.cs
@@ -1,4 +1,7 @@
-namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test;
+namespace SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test;
+
+using Microsoft.Extensions.Diagnostics.HealthChecks;
+
public class HealthCheckCircuitBreakerTests
{
private readonly Mock _hostMock;
diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj
index 909dcfbe..74f0b935 100644
--- a/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj
+++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test/SlimMessageBus.Host.CircuitBreaker.HealthCheck.Test.csproj
@@ -12,6 +12,7 @@
+
diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/GlobalUsings.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/GlobalUsings.cs
new file mode 100644
index 00000000..820b49b3
--- /dev/null
+++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/GlobalUsings.cs
@@ -0,0 +1,8 @@
+global using FluentAssertions;
+
+global using Microsoft.Extensions.DependencyInjection;
+global using Microsoft.Extensions.DependencyInjection.Extensions;
+global using Microsoft.Extensions.Logging;
+global using Microsoft.Extensions.Logging.Abstractions;
+
+global using Xunit;
diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs
new file mode 100644
index 00000000..8003c373
--- /dev/null
+++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/HealthCheckCircuitBreakerAbstractConsumerInterceptorTests.cs
@@ -0,0 +1,140 @@
+namespace SlimMessageBus.Host.CircuitBreaker.Test;
+
+public class CircuitBreakerAbstractConsumerInterceptorTests
+{
+ private class TestConsumer(ILogger logger, IEnumerable settings, IEnumerable interceptors)
+ : AbstractConsumer(logger, settings, "path", interceptors)
+ {
+ protected override Task OnStart() => Task.CompletedTask;
+ protected override Task OnStop() => Task.CompletedTask;
+ }
+
+ private class TestConsumerSettings : AbstractConsumerSettings;
+
+ public class CircuitBreakerAccessor
+ {
+ public Circuit State { get; set; }
+ public int SubscribeCallCount { get; set; } = 0;
+ public int UnsubscribeCallCount { get; set; } = 0;
+ public Func? OnChange { get; set; }
+ }
+
+ private class TestCircuitBreaker : IConsumerCircuitBreaker
+ {
+ private readonly CircuitBreakerAccessor _accessor;
+
+ public TestCircuitBreaker(CircuitBreakerAccessor accessor, IEnumerable settings)
+ {
+ _accessor = accessor;
+ Settings = settings;
+ State = Circuit.Open;
+ }
+
+ public Circuit State
+ {
+ get => _accessor.State;
+ set => _accessor.State = value;
+ }
+ public IEnumerable Settings { get; }
+
+ public Task Subscribe(Func onChange)
+ {
+ _accessor.SubscribeCallCount++;
+ _accessor.OnChange = onChange;
+
+ return Task.CompletedTask;
+ }
+
+ public void Unsubscribe()
+ {
+ _accessor.UnsubscribeCallCount++;
+ }
+ }
+
+ private readonly List _settings;
+ private readonly TestConsumer _target;
+ private readonly CircuitBreakerAccessor accessor;
+
+ public CircuitBreakerAbstractConsumerInterceptorTests()
+ {
+ accessor = new CircuitBreakerAccessor();
+
+ var h = new CircuitBreakerAbstractConsumerInterceptor();
+
+ var serviceCollection = new ServiceCollection();
+ serviceCollection.TryAddSingleton(accessor);
+ serviceCollection.TryAddTransient();
+ serviceCollection.TryAddEnumerable(ServiceDescriptor.Singleton(h));
+
+ var testSettings = new TestConsumerSettings
+ {
+ MessageBusSettings = new MessageBusSettings { ServiceProvider = serviceCollection.BuildServiceProvider() }
+ };
+
+ var breakers = testSettings.GetOrCreate(ConsumerSettingsProperties.CircuitBreakerTypes, () => []);
+ breakers.Add();
+
+ _settings = [testSettings];
+
+ _target = new TestConsumer(NullLogger.Instance, _settings, [h]);
+ }
+
+ [Fact]
+ public async Task When_Start_ShouldStartCircuitBreakers_WhenNotStarted()
+ {
+ // Arrange
+
+ // Act
+ await _target.Start();
+
+ // Assert
+ _target.IsStarted.Should().BeTrue();
+ accessor.SubscribeCallCount.Should().Be(1);
+ }
+
+ [Fact]
+ public async Task When_Stop_ShouldStopCircuitBreakers_WhenStarted()
+ {
+ // Arrange
+ await _target.Start();
+
+ // Act
+ await _target.Stop();
+
+ // Assert
+ _target.IsStarted.Should().BeFalse();
+ accessor.UnsubscribeCallCount.Should().Be(1);
+ }
+
+ [Fact]
+ public async Task When_BreakerChanged_Should_PauseConsumer_Given_BreakerClosed()
+ {
+ // Arrange
+ await _target.Start();
+
+ // Act
+ _target.GetOrDefault(AbstractConsumerProperties.IsPaused, false).Should().BeFalse();
+ accessor.State = Circuit.Closed;
+ await accessor.OnChange!(Circuit.Closed);
+
+ // Assert
+ _target.GetOrDefault(AbstractConsumerProperties.IsPaused, false).Should().BeTrue();
+ }
+
+ [Fact]
+ public async Task When_BreakerChanged_Should_ResumeConsumer_Given_BreakerOpen()
+ {
+ // Arrange
+ await _target.Start();
+ accessor.State = Circuit.Closed;
+ await accessor.OnChange!(Circuit.Open);
+
+ // Act
+ _target.GetOrDefault(AbstractConsumerProperties.IsPaused, false).Should().BeTrue();
+ accessor.State = Circuit.Open;
+ await accessor.OnChange(Circuit.Open);
+
+ // Assert
+ _target.GetOrDefault(AbstractConsumerProperties.IsPaused, false).Should().BeFalse();
+ }
+}
diff --git a/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/SlimMessageBus.Host.CircuitBreaker.Test.csproj b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/SlimMessageBus.Host.CircuitBreaker.Test.csproj
new file mode 100644
index 00000000..0d44b542
--- /dev/null
+++ b/src/Tests/SlimMessageBus.Host.CircuitBreaker.Test/SlimMessageBus.Host.CircuitBreaker.Test.csproj
@@ -0,0 +1,20 @@
+
+
+
+
+
+ enable
+
+
+
+
+
+
+
+
+
+ PreserveNewest
+
+
+
+
diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaGroupConsumerTests.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaGroupConsumerTests.cs
index 258fd280..32e70db3 100644
--- a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaGroupConsumerTests.cs
+++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaGroupConsumerTests.cs
@@ -15,7 +15,7 @@ public KafkaGroupConsumerTests()
var providerSettings = new KafkaMessageBusSettings("host");
var consumerSettings = Array.Empty();
- var subjectMock = new Mock(loggerFactoryMock.Object, providerSettings, consumerSettings, "group", new List { "topic" }, processorFactoryMock.Object) { CallBase = true };
+ var subjectMock = new Mock(loggerFactoryMock.Object, providerSettings, consumerSettings, Array.Empty(), "group", new List { "topic" }, processorFactoryMock.Object) { CallBase = true };
_subject = subjectMock.Object;
}
diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs
index 5d49be44..a05b74cf 100644
--- a/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs
+++ b/src/Tests/SlimMessageBus.Host.Test/Consumer/AbstractConsumerTests.cs
@@ -2,138 +2,87 @@
public class AbstractConsumerTests
{
- private class TestConsumer : AbstractConsumer
- {
- public TestConsumer(ILogger logger, IEnumerable settings)
- : base(logger, settings) { }
-
- protected override Task OnStart() => Task.CompletedTask;
- protected override Task OnStop() => Task.CompletedTask;
+ private class TestConsumer(ILogger logger, IEnumerable settings, IEnumerable interceptors)
+ : AbstractConsumer(logger, settings, path: "path", interceptors)
+ {
+ internal protected override Task OnStart() => Task.CompletedTask;
+ internal protected override Task OnStop() => Task.CompletedTask;
}
private class TestConsumerSettings : AbstractConsumerSettings;
- public class CircuitBreakerAccessor
- {
- public Circuit State { get; set; }
- public int SubscribeCallCount { get; set; } = 0;
- public int UnsubscribeCallCount { get; set; } = 0;
- public IEnumerable Settings { get; set; }
- public Func OnChange { get; set; }
- }
-
- private class TestCircuitBreaker : IConsumerCircuitBreaker
- {
- private readonly CircuitBreakerAccessor _accessor;
-
- public TestCircuitBreaker(CircuitBreakerAccessor accessor, IEnumerable settings)
- {
- _accessor = accessor;
- Settings = settings;
- State = Circuit.Open;
- }
-
- public Circuit State
- {
- get => _accessor.State;
- set => _accessor.State = value;
- }
- public IEnumerable Settings { get; }
-
- public Task Subscribe(Func onChange)
- {
- _accessor.SubscribeCallCount++;
- _accessor.OnChange = onChange;
-
- return Task.CompletedTask;
- }
-
- public void Unsubscribe()
- {
- _accessor.UnsubscribeCallCount++;
- }
- }
-
- private readonly List _settings;
- private readonly TestConsumer _target;
- private readonly CircuitBreakerAccessor accessor;
+ private readonly List _settings;
+ private readonly Mock _targetMock;
+ private readonly AbstractConsumer _target;
+ private readonly Mock _interceptor;
public AbstractConsumerTests()
{
- accessor = new CircuitBreakerAccessor();
+ _interceptor = new Mock();
var serviceCollection = new ServiceCollection();
- serviceCollection.TryAddSingleton(accessor);
- serviceCollection.TryAddTransient();
+ serviceCollection.TryAddEnumerable(ServiceDescriptor.Singleton(_interceptor.Object));
var testSettings = new TestConsumerSettings
{
MessageBusSettings = new MessageBusSettings { ServiceProvider = serviceCollection.BuildServiceProvider() }
};
- testSettings.CircuitBreakers.Add();
-
_settings = [testSettings];
- _target = new TestConsumer(NullLogger.Instance, _settings);
+ _targetMock = new Mock(NullLogger.Instance, _settings, "path", new IAbstractConsumerInterceptor[] { _interceptor.Object }) { CallBase = true };
+ _target = _targetMock.Object;
}
- [Fact]
- public async Task Start_ShouldStartCircuitBreakers_WhenNotStarted()
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task When_Start_Then_Interceptor_CanStartIsCalled(bool canStart)
{
// Arrange
+ _interceptor.Setup(x => x.CanStart(_target)).ReturnsAsync(canStart);
// Act
await _target.Start();
// Assert
- _target.IsStarted.Should().BeTrue();
- accessor.SubscribeCallCount.Should().Be(1);
- }
-
- [Fact]
- public async Task Stop_ShouldStopCircuitBreakers_WhenStarted()
+ _target.IsStarted.Should().BeTrue();
+
+ _interceptor.Verify(x => x.CanStart(_target), Times.Once);
+ _interceptor.Verify(x => x.Started(_target), canStart ? Times.Once : Times.Never);
+ _interceptor.VerifyGet(x => x.Order, Times.Once);
+ _interceptor.VerifyNoOtherCalls();
+
+ _targetMock.Verify(x => x.OnStart(), canStart ? Times.Once : Times.Never);
+ _targetMock.VerifyNoOtherCalls();
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task When_Stop_Then_Interceptor_CanStopIsCalled(bool canStop)
{
// Arrange
- await _target.Start();
+ _interceptor.Setup(x => x.CanStart(_target)).ReturnsAsync(true);
+ _interceptor.Setup(x => x.CanStop(_target)).ReturnsAsync(canStop);
+
+ await _target.Start();
// Act
await _target.Stop();
// Assert
- _target.IsStarted.Should().BeFalse();
- accessor.UnsubscribeCallCount.Should().Be(1);
- }
-
- [Fact]
- public async Task BreakerChanged_ShouldPauseConsumer_WhenBreakerClosed()
- {
- // Arrange
- await _target.Start();
-
- // Act
- _target.IsPaused.Should().BeFalse();
- accessor.State = Circuit.Closed;
- await _target.BreakerChanged(Circuit.Closed);
-
- // Assert
- _target.IsPaused.Should().BeTrue();
- }
-
- [Fact]
- public async Task BreakerChanged_ShouldResumeConsumer_WhenBreakerOpen()
- {
- // Arrange
- await _target.Start();
- accessor.State = Circuit.Closed;
- await _target.BreakerChanged(Circuit.Open);
-
- // Act
- _target.IsPaused.Should().BeTrue();
- accessor.State = Circuit.Open;
- await _target.BreakerChanged(Circuit.Open);
-
- // Assert
- _target.IsPaused.Should().BeFalse();
+ _target.IsStarted.Should().BeFalse();
+
+ _interceptor.Verify(x => x.CanStart(_target), Times.Once);
+ _interceptor.Verify(x => x.CanStop(_target), Times.Once);
+ _interceptor.Verify(x => x.Started(_target), Times.Once);
+ _interceptor.Verify(x => x.Stopped(_target), canStop ? Times.Once : Times.Never);
+ _interceptor.VerifyGet(x => x.Order, Times.Once);
+ _interceptor.VerifyNoOtherCalls();
+
+ _targetMock.Verify(x => x.OnStart(), Times.Once);
+ _targetMock.Verify(x => x.OnStop(), canStop ? Times.Once : Times.Never);
+ _targetMock.VerifyNoOtherCalls();
}
}
diff --git a/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs b/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs
index a1aeba2e..4217e9a7 100644
--- a/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs
+++ b/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs
@@ -81,10 +81,10 @@ public void TriggerPendingRequestCleanup()
PendingRequestManager.CleanPendingRequests();
}
- public class MessageBusTestedConsumer(ILogger logger) : AbstractConsumer(logger, [])
+ public class MessageBusTestedConsumer(ILogger logger) : AbstractConsumer(logger, [], "path", [])
{
- protected override Task OnStart() => Task.CompletedTask;
+ internal protected override Task OnStart() => Task.CompletedTask;
- protected override Task OnStop() => Task.CompletedTask;
+ internal protected override Task OnStop() => Task.CompletedTask;
}
}