Skip to content

Pragmatic.Messaging

Event-driven messaging for .NET 10 with zero reflection, AOT-safe handler pipelines, transactional outbox, saga orchestration, batch processing, and multi-transport support — all source-generated at compile time.

Messaging in .NET typically means MassTransit or NServiceBus. Both are powerful, but both rely on runtime assembly scanning for handler discovery, dynamic dispatch via GetType(), and runtime expression evaluation for middleware pipelines. Retry policies live in a separate Polly configuration. Sagas are discovered by reflection. The connection between a handler and its resilience configuration is implicit.

// Without Pragmatic: MassTransit-style — config scattered across files
services.AddMassTransit(x =>
{
x.AddConsumer<OrderCreatedConsumer>(); // Runtime discovery
x.UsingRabbitMq((context, cfg) =>
{
cfg.ReceiveEndpoint("order-created", e =>
{
e.UseMessageRetry(r => r.Intervals(200, 500, 1000)); // Separate from handler
e.UseCircuitBreaker(cb => { cb.TrackingPeriod = TimeSpan.FromMinutes(1); });
e.ConfigureConsumer<OrderCreatedConsumer>(context);
});
});
});
// The handler class has no idea about retry, circuit breaker, or timeout
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
public async Task Consume(ConsumeContext<OrderCreated> context) { ... }
}

With Pragmatic.Messaging, resilience is declared directly on the handler. The source generator produces the complete pipeline — retry loop, circuit breaker, timeout, idempotency, telemetry — as inline code at compile time.

[MessageHandler]
[Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 200)]
[CircuitBreaker(FailureThreshold = 5, BreakDurationSeconds = 30)]
[Timeout(TimeoutSeconds = 60)]
public sealed partial class OrderCreatedHandler(
IOrderService service) : IMessageHandler<OrderCreated>
{
public async Task HandleAsync(OrderCreated message, MessageContext context, CancellationToken ct)
{
await service.ProcessAsync(message.OrderId, ct);
}
}

The SG generates OrderCreatedHandler_Pipeline.g.cs with the retry loop, circuit breaker state, timeout token, idempotency check, and telemetry — all inline, zero Polly, zero reflection.

Terminal window
dotnet add package Pragmatic.Messaging # Core + EF Core outbox
dotnet add package Pragmatic.Messaging.Core # Interfaces, attributes, InMemory bus

Add transport packages as needed:

Terminal window
dotnet add package Pragmatic.Messaging.Channels # In-process async
dotnet add package Pragmatic.Messaging.RabbitMQ # Distributed (AMQP)
dotnet add package Pragmatic.Messaging.Kafka # Event streaming

Add the source generator:

<ProjectReference Include="..\Pragmatic.SourceGenerator\src\Pragmatic.SourceGenerator\Pragmatic.SourceGenerator.csproj"
OutputItemType="Analyzer"
ReferenceOutputAssembly="false" />

// The message (a simple record)
public record OrderCreated(Guid OrderId, decimal Total, string CustomerId);
// The handler
[MessageHandler]
[Retry(MaxAttempts = 3)]
public sealed partial class OrderCreatedHandler(
INotificationService notifications) : IMessageHandler<OrderCreated>
{
public async Task HandleAsync(OrderCreated message, MessageContext context, CancellationToken ct)
{
await notifications.SendOrderConfirmationAsync(message.OrderId, ct);
}
}
await PragmaticApp.RunAsync(args, app =>
{
app.UseMessaging(msg =>
{
msg.UseChannels(ch => { ch.Capacity = 1000; ch.ConsumerCount = 2; });
msg.EnableIdempotency();
});
});
public class CheckoutService(IMessageBus bus)
{
public async Task CompleteCheckoutAsync(Order order, CancellationToken ct)
{
await bus.PublishAsync(new OrderCreated(order.Id, order.Total, order.CustomerId), ct);
}
}

The SG auto-discovers handlers by [MessageHandler]. No manual registration needed.


PackageDescriptionDependencies
Pragmatic.Messaging.CoreInterfaces, attributes, InMemory bus, routing, serializerNone
Pragmatic.MessagingMeta-package + EF Core outbox (OutboxInterceptor, OutboxDeliveryService)Core, EF Core
Pragmatic.Messaging.ChannelsIn-process transport via System.Threading.ChannelsCore
Pragmatic.Messaging.RabbitMQRabbitMQ transport with connection pooling, auto-reconnectCore
Pragmatic.Messaging.KafkaKafka transport with consumer groups, manual commitCore
Pragmatic.Messaging.SagaSaga orchestration: ISaga<T>, repositories, EF Core persistenceCore
Pragmatic.Messaging.Saga.EFCoreEF Core saga persistence (SagaInstance, SagaStep tables)Saga
Pragmatic.Messaging.BatchBatch dispatcher, splitter, progress trackingCore
Pragmatic.Messaging.AuditingMessage audit trail with middlewareCore
Pragmatic.Messaging.JobsScheduled message delivery via Pragmatic.JobsCore, Jobs
Pragmatic.Messaging.EFCoreEF Core stores: outbox source, idempotency, auditCore, EF Core
Pragmatic.Messaging.TestingMessageBusTestHarness for assertions in testsCore

The core handler interface. One method, strongly typed.

public interface IMessageHandler<in T>
{
int Order => 0; // Execution order among handlers for same message type
Task HandleAsync(T message, MessageContext context, CancellationToken ct = default);
}
[MessageHandler]
public sealed partial class ReservationConfirmedHandler(
IEmailService email,
ILogger<ReservationConfirmedHandler> logger) : IMessageHandler<ReservationConfirmed>
{
public async Task HandleAsync(
ReservationConfirmed message, MessageContext context, CancellationToken ct)
{
logger.LogInformation("Reservation {Id} confirmed", message.ReservationId);
await email.SendConfirmationAsync(message.ReservationId, message.GuestEmail, ct);
}
}

The class must be partial (PRAG0801 warning if not) and implement IMessageHandler<T> (PRAG0800 error if not).

When multiple handlers consume the same message type, the Order property controls execution sequence:

[MessageHandler(Order = 1)]
public sealed partial class AuditOrderHandler : IMessageHandler<OrderCreated>
{
// Runs first
}
[MessageHandler(Order = 10)]
public sealed partial class ProcessOrderHandler : IMessageHandler<OrderCreated>
{
// Runs after audit
}

Handlers are decorated with resilience attributes. The SG generates the complete pipeline at compile time.

Inline retry loop with configurable backoff.

[Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 200)]
PropertyTypeDefaultDescription
MaxAttemptsint3Maximum retry attempts
StrategyBackoffStrategyExponentialFixed, Exponential, ExponentialWithJitter
BaseDelayMsint200Base delay between retries (ms)

Static per-handler circuit state using Interlocked.

[CircuitBreaker(FailureThreshold = 5, BreakDurationSeconds = 30)]
PropertyTypeDefaultDescription
FailureThresholdint5Consecutive failures before circuit opens
BreakDurationSecondsint30Seconds circuit stays open before half-open probe

Linked CancellationToken with deadline.

[Timeout(TimeoutSeconds = 60)]
PropertyTypeDefaultDescription
TimeoutSecondsint30Maximum execution time per invocation

All three attributes compose in a fixed order:

Timeout → Retry → CircuitBreaker → Handler

The timeout wraps the entire retry loop. Each retry attempt checks the circuit breaker before invoking the handler.


The IMessageBus interface supports three dispatch patterns:

public interface IMessageBus
{
// Publish — fan-out to all handlers
Task PublishAsync<T>(T message, CancellationToken ct = default) where T : notnull;
Task PublishAsync<T>(T message, MessageContext context, CancellationToken ct = default) where T : notnull;
// Send — point-to-point, single consumer
Task SendAsync<T>(T message, CancellationToken ct = default) where T : notnull;
Task SendAsync<T>(T message, MessageContext context, CancellationToken ct = default) where T : notnull;
// Request/Reply
Task<TResponse> RequestAsync<TRequest, TResponse>(TRequest request, CancellationToken ct = default)
where TRequest : notnull where TResponse : notnull;
// Untyped dispatch (used by outbox delivery)
Task DispatchAsync(object message, MessageContext context, CancellationToken ct = default);
}
MethodSemanticsUse Case
PublishAsyncFan-out (all handlers)Domain events, notifications
SendAsyncPoint-to-point (one handler)Commands, task dispatch

Every message carries metadata:

public sealed record MessageContext(
string MessageId, // Unique per message instance
string? CorrelationId, // Trace correlation
string? TenantId, // Multi-tenant isolation
string? UserId, // Originating user
IReadOnlyDictionary<string, string>? Headers, // Extensible
int RetryCount, // 0 = first attempt
string? BusName, // null = default bus
string? SourceBoundary, // Producing boundary
DateTimeOffset? EnqueuedAt); // Transport timestamp
// Create with auto-generated MessageId
var context = MessageContext.New(
correlationId: "order-123",
tenantId: "tenant-acme",
headers: new Dictionary<string, string> { ["x-source"] = "api" });
await bus.PublishAsync(new OrderCreated(orderId, total), context);

Direct in-process dispatch. No async decoupling. Suitable for development and testing.

msg.UseInMemory(); // Explicit no-op — InMemory is the default

Uses System.Threading.Channels for async decoupling with configurable backpressure.

msg.UseChannels(ch =>
{
ch.Capacity = 1000; // Per-channel buffer size
ch.ConsumerCount = 2; // Consumer tasks per subscription
ch.FullMode = BoundedChannelFullMode.Wait; // Backpressure: block producer
});

Registers ChannelTransport as IMessageTransport and ChannelConsumerService as a hosted service. Includes ChannelTransportHealthCheck for health monitoring.

Full-featured RabbitMQ transport with connection pooling, auto-reconnect, topic exchange, manual ack/nack, and prefetch QoS.

msg.UseRabbitMq(rmq =>
{
rmq.ConnectionString = "amqp://guest:guest@localhost:5672";
rmq.ConsumerPrefetchCount = 10; // QoS prefetch count
rmq.AutoReconnect = true; // Auto-reconnect on connection loss
rmq.ReconnectBaseDelayMs = 1000; // Initial reconnect delay
rmq.MaxReconnectAttempts = 0; // 0 = infinite
rmq.DurableQueues = true; // Survive broker restart
rmq.PersistentMessages = true; // Messages survive restart
rmq.ExchangeType = "topic"; // Exchange type
});

Docker compose for development:

docker-compose.messaging.yml
services:
rabbitmq:
image: rabbitmq:3.13-management
ports:
- "5672:5672"
- "15672:15672" # Management UI

Includes RabbitMqHealthCheck for health monitoring.

Apache Kafka transport for high-throughput event streaming with consumer groups.

msg.UseKafka(k =>
{
k.BootstrapServers = "localhost:9092";
k.GroupId = "my-boundary"; // Consumer group
k.EnableIdempotence = true; // Exactly-once producer
k.EnableAutoCommit = false; // Manual commit after handler
k.AutoOffsetReset = "earliest"; // Start from beginning
k.MaxPollIntervalMs = 300000; // 5-minute max poll interval
k.SessionTimeoutMs = 45000; // Session timeout
});

The transactional outbox ensures that domain events are published if and only if the database transaction commits. No distributed transactions needed.

Entity.RaiseDomainEvent(new OrderCreated(...))
→ SaveChangesAsync()
→ OutboxInterceptor writes to __OutboxMessages (same transaction)
→ Transaction commits (or rolls back — both atomically)
→ OutboxDeliveryService polls and delivers to transport
msg.EnableOutbox(outbox =>
{
outbox.PollingIntervalSeconds = 5; // How often to poll for pending messages
outbox.BatchSize = 100; // Messages per delivery batch
outbox.MaxRetries = 3; // Delivery retry attempts
});

Add [EnableOutbox] to your boundary DbContext:

[EnableOutbox]
public class BookingDbContext : PragmaticDbContext { ... }

The SG generates:

  • OutboxSource implementation (IOutboxSource)
  • OutboxEntityTypeConfiguration for the __OutboxMessages table

For EF Core-backed stores (outbox source, idempotency, audit):

Terminal window
dotnet add package Pragmatic.Messaging.EFCore

Message deduplication at the handler level.

msg.EnableIdempotency();

Every handler checks IIdempotencyStore.TryMarkAsProcessedAsync(context.MessageId) before execution. Duplicate messages (same MessageId) are silently skipped.

StorePackageUse Case
InMemoryIdempotencyStorePragmatic.Messaging.CoreDevelopment (default)
EfCoreIdempotencyStorePragmatic.Messaging.EFCoreProduction

Sagas coordinate long-running business processes as a sequence of events and actions with state tracking and compensation.

public enum CheckInState
{
Initial,
GuestVerified,
RoomAssigned,
Completed,
Cancelled
}
[Saga<CheckInState>]
public partial class CheckInSaga : ISaga<CheckInState>
{
public Guid Id { get; set; }
public CheckInState State { get; set; }
public string CorrelationId { get; set; } = "";
public DateTimeOffset StartedAt { get; set; }
public DateTimeOffset? CompletedAt { get; set; }
// Saga-specific data
public string? RoomNumber { get; set; }
[SagaStart]
public VerifyGuestAction Handle(GuestArrived @event)
{
return new VerifyGuestAction(@event.GuestId);
}
[InState(CheckInState.GuestVerified, NextState = CheckInState.RoomAssigned)]
[CompensateWith<CancelCheckInAction>]
public AssignRoomAction Handle(IdentityVerified @event)
{
return new AssignRoomAction(@event.GuestId);
}
[InState(CheckInState.RoomAssigned, NextState = CheckInState.Completed)]
public object? Handle(RoomAssigned @event)
{
RoomNumber = @event.RoomNumber;
CompletedAt = DateTimeOffset.UtcNow;
return null; // Terminal — no further action
}
}
public interface ISaga<TState> where TState : struct, Enum
{
Guid Id { get; set; }
TState State { get; set; }
string CorrelationId { get; set; }
DateTimeOffset StartedAt { get; set; }
DateTimeOffset? CompletedAt { get; set; }
}
AttributeTargetDescription
[Saga<TState>]ClassMarks a saga with state enum
[SagaStart]MethodEntry point handler (Initial state)
[InState(state, NextState)]MethodHandler for a specific state
[CompensateWith<TAction>]MethodAction dispatched on failure (rollback)
[SagaTimeout(Seconds)]ClassAuto-cancel if not completed within timeout

The SG generates {Saga}.Orchestrator.g.cs with:

  • State routing: switch on current state + incoming event type
  • Compensation chain: on failure, dispatches [CompensateWith] actions in reverse order
  • Transition validation: compile-time graph analysis from [InState] + NextState
msg.EnableSagas(); // InMemory repositories (dev)
msg.EnableSagaPersistence(); // EF Core repositories (prod)

EF Core persistence uses two tables:

TablePurpose
__SagaInstancesSaga state, correlation, timestamps
__SagaStepsIndividual step execution history

Split large operations into individually tracked items, published via the message bus.

public record RecalculatePricesCommand(Guid[] ProductIds, string Reason);
public record RecalculatePriceBatch(Guid[] ProductIds);
public class PriceRecalculationSplitter
: IBatchSplitter<RecalculatePricesCommand, RecalculatePriceBatch>
{
public IReadOnlyList<RecalculatePriceBatch> Split(RecalculatePricesCommand command)
=> command.ProductIds
.Chunk(100)
.Select(chunk => new RecalculatePriceBatch(chunk))
.ToList();
}
var batchId = await dispatcher.DispatchAsync(
new RecalculatePricesCommand(productIds, "quarterly-review"),
label: "Price recalculation Q1");

The BatchDispatcher<TBatch, TItem> splits the command, creates a BatchProgress entry, and publishes each item to the bus with batch headers (BatchId, ItemIndex, TotalItems).

[MessageHandler]
public sealed partial class PriceBatchHandler(
IBatchProgressStore store) : IMessageHandler<RecalculatePriceBatch>
{
public async Task HandleAsync(
RecalculatePriceBatch batch, MessageContext context, CancellationToken ct)
{
try
{
// Process the batch...
await BatchTracker.ReportSuccessAsync(context, store, ct);
}
catch (Exception)
{
await BatchTracker.ReportFailureAsync(context, store, ct);
throw;
}
}
}
var progress = await store.GetProgressAsync(batchId);
// progress.Total = 2000
// progress.Completed = 1450
// progress.Failed = 12
// progress.Pending = 538
// progress.ProgressPercent = 73.1
msg.EnableBatchProcessing(); // InMemory progress store
msg.EnableBatchPersistence(); // EF Core progress store

Point-to-point request with a typed response.

[RequestHandler]
public sealed partial class GetPriceHandler
: IRequestHandler<GetPriceRequest, PriceResponse>
{
public Task<PriceResponse> HandleAsync(
GetPriceRequest request, MessageContext context, CancellationToken ct)
{
return Task.FromResult(new PriceResponse(request.ProductId, 29.99m));
}
}
var response = await bus.RequestAsync<GetPriceRequest, PriceResponse>(
new GetPriceRequest(productId));

Request/Reply uses queue semantics (single consumer). For cross-service calls, consider using RemoteBoundary instead, which provides typed HTTP invokers.


Isolate message flows by routing handlers to named buses.

[MessageHandler]
[OnBus("analytics")]
public sealed partial class PageViewedHandler : IMessageHandler<PageViewed>
{
public async Task HandleAsync(PageViewed message, MessageContext context, CancellationToken ct)
{
// Runs on the "analytics" bus, isolated from business-critical traffic
}
}
msg.AddBus("analytics", bus =>
{
// Each named bus can have its own transport
// bus.UseRabbitMq(rmq => { rmq.ConnectionString = "amqp://analytics-rmq:5672"; });
});

Handlers without [OnBus] consume from the default bus. Named buses are fully independent with their own transport and routing.


Record lifecycle events for every message.

msg.EnableAuditing(a =>
{
a.IncludePayload = false; // Exclude message body from audit entries (default: true)
});

AuditMiddleware (Order -100) wraps every handler and records:

  • Published / Handled / Failed / DeadLettered events
  • Duration, handler name, correlation ID, tenant
var entries = await auditStore.QueryAsync(new AuditQuery
{
CorrelationId = "order-123",
FromDate = DateTimeOffset.UtcNow.AddDays(-7)
});
StorePackageUse Case
InMemoryAuditStorePragmatic.Messaging.AuditingDevelopment (default)
EfCoreAuditStorePragmatic.Messaging.EFCoreProduction

Pragmatic.Messaging.Jobs enables future message delivery backed by Pragmatic.Jobs:

msg.EnableScheduledMessages(); // Requires Pragmatic.Messaging.Jobs

This registers IMessageScheduler:

public interface IMessageScheduler
{
Task<Guid> ScheduleAsync<T>(T message, TimeSpan delay, CancellationToken ct = default)
where T : notnull;
Task<Guid> ScheduleAsync<T>(T message, DateTimeOffset scheduledAt, CancellationToken ct = default)
where T : notnull;
Task CancelAsync(Guid scheduleId, CancellationToken ct = default);
}
// Deliver in 24 hours
await scheduler.ScheduleAsync(
new CheckoutReminder(reservationId),
delay: TimeSpan.FromHours(24));
// Deliver at specific time
await scheduler.ScheduleAsync(
new CheckoutReminder(reservationId),
scheduledAt: checkout.AddHours(-2));

Internally creates a PublishMessageJob executed by the Jobs scheduler.


Pragmatic.Messaging.Testing provides MessageBusTestHarness — an in-memory bus that records all published messages without dispatching to handlers.

using Pragmatic.Messaging.Testing;
public class OrderServiceTests
{
[Fact]
public async Task CompleteCheckout_PublishesOrderCreated()
{
// Arrange
var harness = new MessageBusTestHarness();
var service = new CheckoutService(harness);
// Act
await service.CompleteCheckoutAsync(order, CancellationToken.None);
// Assert
harness.HasPublished<OrderCreated>().Should().BeTrue();
harness.PublishedOf<OrderCreated>()
.Should().ContainSingle(m => m.OrderId == order.Id);
}
}
MethodDescription
PublishedAll recorded messages
PublishedOf<T>()All messages of type T
HasPublished<T>()True if any message of type T was published
HasPublished<T>(predicate)True if any matching message was published
Reset()Clear all recorded messages

RequestAsync throws InvalidOperationException (records the request but cannot resolve a handler). Use a mock IRequestHandler<TRequest, TResponse> for request/reply testing.


Messages that exhaust all retry attempts are sent to the dead letter store for manual investigation.

public interface IDeadLetterStore
{
Task StoreAsync(DeadLetterMessage message, CancellationToken ct = default);
Task<IReadOnlyList<DeadLetterMessage>> GetAllAsync(CancellationToken ct = default);
}
StorePackageUse Case
InMemoryDeadLetterStorePragmatic.Messaging.CoreDevelopment (default)

To disable the default store (when providing a custom implementation):

msg.DisableInMemoryDeadLetter();

With Pragmatic.Composition, the SG auto-detects Pragmatic.Messaging via FeatureDetector and generates host registration:

// PragmaticHost.g.cs (auto-generated)
services.AddPragmaticMessaging();
services.AddPragmaticMessageHandlers();

The SG detects additional packages and registers them:

  • HasMessagingChannels → ChannelTransport
  • HasMessagingRabbitMq → RabbitMqTransport
  • HasMessagingAuditing → AuditMiddleware

ActivitySource: Pragmatic.Messaging

Handler spans include tags for message type, handler name, tenant, correlation ID, and retry count.

MetricTypeDescription
pragmatic.messaging.publishedCounterTotal messages published
pragmatic.messaging.handler_failuresCounterHandler failures
pragmatic.messaging.handler_durationHistogram (ms)Handler execution duration
pragmatic.messaging.retry_attemptsCounterTotal retry attempts
pragmatic.messaging.circuit_breaker_tripsCounterCircuit breaker open events
pragmatic.messaging.idempotency_duplicatesCounterDuplicate messages skipped
pragmatic.messaging.dead_letteredCounterMessages sent to dead letter
pragmatic.messaging.outbox_delivery_durationHistogram (ms)Outbox delivery batch duration

The source generator emits diagnostics for common mistakes:

IDSeverityMessage
PRAG0800Error[MessageHandler] on class not implementing IMessageHandler<T>
PRAG0801WarningHandler should be partial
PRAG0802Error[Retry] with MaxAttempts <= 0
PRAG0803Error[MessageMiddleware] on class not implementing IMessageMiddleware
IDSeverityMessage
PRAG0810ErrorInconsistent saga state transition
PRAG0811InfoSaga state has no handler (terminal states OK)
PRAG0812WarningUnreachable saga states
PRAG0813Error[Saga<T>] where T is not an enum
PRAG0814ErrorSaga without [SagaStart]
IDSeverityMessage
PRAG0815WarningCross-boundary handler without [DependsOn]
PRAG0816WarningEvent type has no consumers
PRAG0817ErrorQueue name collision
PRAG0818WarningCross-boundary handler without outbox
IDSeverityMessage
PRAG0830Error[EnableOutbox] on non-DbContext class
PRAG0831Error[EnableOutbox] without Pragmatic.Messaging.EFCore reference

For a project with two handlers and one saga, the SG produces:

obj/GeneratedFiles/Pragmatic.SourceGenerator/
├── OrderCreatedHandler.Pipeline.g.cs # Resilience pipeline
├── InvoicePaidHandler.Pipeline.g.cs # Resilience pipeline
├── CheckInSaga.Orchestrator.g.cs # Saga state routing + compensation
├── _Infra.Messaging.Registration.g.cs # DI registration for all handlers
├── _Infra.Messaging.TypeRegistry.g.cs # AOT-safe message type map
├── _Infra.Messaging.Topology.g.cs # Exchange/queue declarations
├── _Infra.Messaging.Routing.g.cs # Routing table
└── _Metadata.MessageHandlers.g.cs # Handler metadata

  1. IMessage is independent from IDomainEvent — no coupling between messaging and persistence event systems.
  2. OutboxInterceptor replaces DomainEventsInterceptor when messaging is detected — the SG makes this a compile-time decision.
  3. MessageTypeRegistry is a switch expression over string names — zero Type.GetType(), zero reflection.

The runtime never scans assemblies, never calls GetType(), never resolves handlers via reflection. The SG generates:

  • Direct handler registration (not services.AddScoped(typeof(IMessageHandler<>)))
  • Type registry as switch expression (not Type.GetType(typeName))
  • Pipeline as inline code (not middleware chain resolved at runtime)

AvoidPrefer
Runtime handler discovery[MessageHandler] attribute
Polly policies in DI[Retry], [CircuitBreaker], [Timeout] on handler
Type.GetType() for deserializationSG-generated IMessageTypeRegistry
Global retry configPer-handler resilience attributes
Saga discovery by reflection[Saga<TState>] with compile-time graph validation
Manual handler registrationSG-generated registration

// Messages
public record ReservationConfirmed(Guid ReservationId, string GuestEmail, DateTimeOffset CheckIn);
public record InvoicePaid(Guid InvoiceId, Guid ReservationId, decimal Amount);
// Handler with full resilience
[MessageHandler]
[Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 200)]
[CircuitBreaker(FailureThreshold = 5, BreakDurationSeconds = 30)]
[Timeout(TimeoutSeconds = 60)]
public sealed partial class ReservationConfirmedHandler(
IEmailService email) : IMessageHandler<ReservationConfirmed>
{
public async Task HandleAsync(
ReservationConfirmed message, MessageContext context, CancellationToken ct)
{
await email.SendConfirmationAsync(message.ReservationId, message.GuestEmail, ct);
}
}
[MessageHandler]
[Retry(MaxAttempts = 3)]
public sealed partial class InvoicePaidHandler(
IReservationService reservations) : IMessageHandler<InvoicePaid>
{
public async Task HandleAsync(
InvoicePaid message, MessageContext context, CancellationToken ct)
{
await reservations.MarkPaidAsync(message.ReservationId, message.Amount, ct);
}
}
// Host configuration
await PragmaticApp.RunAsync(args, app =>
{
app.UseMessaging(msg =>
{
msg.UseChannels(ch => { ch.Capacity = 1000; ch.ConsumerCount = 2; });
msg.EnableIdempotency();
msg.EnableOutbox(o => { o.PollingIntervalSeconds = 5; o.BatchSize = 100; });
msg.EnableAuditing();
msg.EnableSagas();
msg.EnableBatchProcessing();
});
});