Pragmatic.Messaging
Event-driven messaging for .NET 10 with zero reflection, AOT-safe handler pipelines, transactional outbox, saga orchestration, batch processing, and multi-transport support — all source-generated at compile time.
The Problem
Section titled “The Problem”Messaging in .NET typically means MassTransit or NServiceBus. Both are powerful, but both rely on runtime assembly scanning for handler discovery, dynamic dispatch via GetType(), and runtime expression evaluation for middleware pipelines. Retry policies live in a separate Polly configuration. Sagas are discovered by reflection. The connection between a handler and its resilience configuration is implicit.
// Without Pragmatic: MassTransit-style — config scattered across filesservices.AddMassTransit(x =>{ x.AddConsumer<OrderCreatedConsumer>(); // Runtime discovery x.UsingRabbitMq((context, cfg) => { cfg.ReceiveEndpoint("order-created", e => { e.UseMessageRetry(r => r.Intervals(200, 500, 1000)); // Separate from handler e.UseCircuitBreaker(cb => { cb.TrackingPeriod = TimeSpan.FromMinutes(1); }); e.ConfigureConsumer<OrderCreatedConsumer>(context); }); });});
// The handler class has no idea about retry, circuit breaker, or timeoutpublic class OrderCreatedConsumer : IConsumer<OrderCreated>{ public async Task Consume(ConsumeContext<OrderCreated> context) { ... }}The Solution
Section titled “The Solution”With Pragmatic.Messaging, resilience is declared directly on the handler. The source generator produces the complete pipeline — retry loop, circuit breaker, timeout, idempotency, telemetry — as inline code at compile time.
[MessageHandler][Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 200)][CircuitBreaker(FailureThreshold = 5, BreakDurationSeconds = 30)][Timeout(TimeoutSeconds = 60)]public sealed partial class OrderCreatedHandler( IOrderService service) : IMessageHandler<OrderCreated>{ public async Task HandleAsync(OrderCreated message, MessageContext context, CancellationToken ct) { await service.ProcessAsync(message.OrderId, ct); }}The SG generates OrderCreatedHandler_Pipeline.g.cs with the retry loop, circuit breaker state, timeout token, idempotency check, and telemetry — all inline, zero Polly, zero reflection.
Installation
Section titled “Installation”dotnet add package Pragmatic.Messaging # Core + EF Core outboxdotnet add package Pragmatic.Messaging.Core # Interfaces, attributes, InMemory busAdd transport packages as needed:
dotnet add package Pragmatic.Messaging.Channels # In-process asyncdotnet add package Pragmatic.Messaging.RabbitMQ # Distributed (AMQP)dotnet add package Pragmatic.Messaging.Kafka # Event streamingAdd the source generator:
<ProjectReference Include="..\Pragmatic.SourceGenerator\src\Pragmatic.SourceGenerator\Pragmatic.SourceGenerator.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" />Quick Start
Section titled “Quick Start”1. Define a message and handler
Section titled “1. Define a message and handler”// The message (a simple record)public record OrderCreated(Guid OrderId, decimal Total, string CustomerId);
// The handler[MessageHandler][Retry(MaxAttempts = 3)]public sealed partial class OrderCreatedHandler( INotificationService notifications) : IMessageHandler<OrderCreated>{ public async Task HandleAsync(OrderCreated message, MessageContext context, CancellationToken ct) { await notifications.SendOrderConfirmationAsync(message.OrderId, ct); }}2. Configure the host
Section titled “2. Configure the host”await PragmaticApp.RunAsync(args, app =>{ app.UseMessaging(msg => { msg.UseChannels(ch => { ch.Capacity = 1000; ch.ConsumerCount = 2; }); msg.EnableIdempotency(); });});3. Publish
Section titled “3. Publish”public class CheckoutService(IMessageBus bus){ public async Task CompleteCheckoutAsync(Order order, CancellationToken ct) { await bus.PublishAsync(new OrderCreated(order.Id, order.Total, order.CustomerId), ct); }}The SG auto-discovers handlers by [MessageHandler]. No manual registration needed.
Package Architecture
Section titled “Package Architecture”| Package | Description | Dependencies |
|---|---|---|
Pragmatic.Messaging.Core | Interfaces, attributes, InMemory bus, routing, serializer | None |
Pragmatic.Messaging | Meta-package + EF Core outbox (OutboxInterceptor, OutboxDeliveryService) | Core, EF Core |
Pragmatic.Messaging.Channels | In-process transport via System.Threading.Channels | Core |
Pragmatic.Messaging.RabbitMQ | RabbitMQ transport with connection pooling, auto-reconnect | Core |
Pragmatic.Messaging.Kafka | Kafka transport with consumer groups, manual commit | Core |
Pragmatic.Messaging.Saga | Saga orchestration: ISaga<T>, repositories, EF Core persistence | Core |
Pragmatic.Messaging.Saga.EFCore | EF Core saga persistence (SagaInstance, SagaStep tables) | Saga |
Pragmatic.Messaging.Batch | Batch dispatcher, splitter, progress tracking | Core |
Pragmatic.Messaging.Auditing | Message audit trail with middleware | Core |
Pragmatic.Messaging.Jobs | Scheduled message delivery via Pragmatic.Jobs | Core, Jobs |
Pragmatic.Messaging.EFCore | EF Core stores: outbox source, idempotency, audit | Core, EF Core |
Pragmatic.Messaging.Testing | MessageBusTestHarness for assertions in tests | Core |
Handlers
Section titled “Handlers”IMessageHandler<T>
Section titled “IMessageHandler<T>”The core handler interface. One method, strongly typed.
public interface IMessageHandler<in T>{ int Order => 0; // Execution order among handlers for same message type Task HandleAsync(T message, MessageContext context, CancellationToken ct = default);}Declaring Handlers
Section titled “Declaring Handlers”[MessageHandler]public sealed partial class ReservationConfirmedHandler( IEmailService email, ILogger<ReservationConfirmedHandler> logger) : IMessageHandler<ReservationConfirmed>{ public async Task HandleAsync( ReservationConfirmed message, MessageContext context, CancellationToken ct) { logger.LogInformation("Reservation {Id} confirmed", message.ReservationId); await email.SendConfirmationAsync(message.ReservationId, message.GuestEmail, ct); }}The class must be partial (PRAG0801 warning if not) and implement IMessageHandler<T> (PRAG0800 error if not).
Handler Order
Section titled “Handler Order”When multiple handlers consume the same message type, the Order property controls execution sequence:
[MessageHandler(Order = 1)]public sealed partial class AuditOrderHandler : IMessageHandler<OrderCreated>{ // Runs first}
[MessageHandler(Order = 10)]public sealed partial class ProcessOrderHandler : IMessageHandler<OrderCreated>{ // Runs after audit}Resilience Pipeline
Section titled “Resilience Pipeline”Handlers are decorated with resilience attributes. The SG generates the complete pipeline at compile time.
[Retry]
Section titled “[Retry]”Inline retry loop with configurable backoff.
[Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 200)]| Property | Type | Default | Description |
|---|---|---|---|
MaxAttempts | int | 3 | Maximum retry attempts |
Strategy | BackoffStrategy | Exponential | Fixed, Exponential, ExponentialWithJitter |
BaseDelayMs | int | 200 | Base delay between retries (ms) |
[CircuitBreaker]
Section titled “[CircuitBreaker]”Static per-handler circuit state using Interlocked.
[CircuitBreaker(FailureThreshold = 5, BreakDurationSeconds = 30)]| Property | Type | Default | Description |
|---|---|---|---|
FailureThreshold | int | 5 | Consecutive failures before circuit opens |
BreakDurationSeconds | int | 30 | Seconds circuit stays open before half-open probe |
[Timeout]
Section titled “[Timeout]”Linked CancellationToken with deadline.
[Timeout(TimeoutSeconds = 60)]| Property | Type | Default | Description |
|---|---|---|---|
TimeoutSeconds | int | 30 | Maximum execution time per invocation |
Pipeline Composition
Section titled “Pipeline Composition”All three attributes compose in a fixed order:
Timeout → Retry → CircuitBreaker → HandlerThe timeout wraps the entire retry loop. Each retry attempt checks the circuit breaker before invoking the handler.
IMessageBus
Section titled “IMessageBus”The IMessageBus interface supports three dispatch patterns:
public interface IMessageBus{ // Publish — fan-out to all handlers Task PublishAsync<T>(T message, CancellationToken ct = default) where T : notnull; Task PublishAsync<T>(T message, MessageContext context, CancellationToken ct = default) where T : notnull;
// Send — point-to-point, single consumer Task SendAsync<T>(T message, CancellationToken ct = default) where T : notnull; Task SendAsync<T>(T message, MessageContext context, CancellationToken ct = default) where T : notnull;
// Request/Reply Task<TResponse> RequestAsync<TRequest, TResponse>(TRequest request, CancellationToken ct = default) where TRequest : notnull where TResponse : notnull;
// Untyped dispatch (used by outbox delivery) Task DispatchAsync(object message, MessageContext context, CancellationToken ct = default);}Publish vs Send
Section titled “Publish vs Send”| Method | Semantics | Use Case |
|---|---|---|
PublishAsync | Fan-out (all handlers) | Domain events, notifications |
SendAsync | Point-to-point (one handler) | Commands, task dispatch |
MessageContext
Section titled “MessageContext”Every message carries metadata:
public sealed record MessageContext( string MessageId, // Unique per message instance string? CorrelationId, // Trace correlation string? TenantId, // Multi-tenant isolation string? UserId, // Originating user IReadOnlyDictionary<string, string>? Headers, // Extensible int RetryCount, // 0 = first attempt string? BusName, // null = default bus string? SourceBoundary, // Producing boundary DateTimeOffset? EnqueuedAt); // Transport timestamp// Create with auto-generated MessageIdvar context = MessageContext.New( correlationId: "order-123", tenantId: "tenant-acme", headers: new Dictionary<string, string> { ["x-source"] = "api" });
await bus.PublishAsync(new OrderCreated(orderId, total), context);Transport Configuration
Section titled “Transport Configuration”InMemory (Default)
Section titled “InMemory (Default)”Direct in-process dispatch. No async decoupling. Suitable for development and testing.
msg.UseInMemory(); // Explicit no-op — InMemory is the defaultChannels (In-Process Async)
Section titled “Channels (In-Process Async)”Uses System.Threading.Channels for async decoupling with configurable backpressure.
msg.UseChannels(ch =>{ ch.Capacity = 1000; // Per-channel buffer size ch.ConsumerCount = 2; // Consumer tasks per subscription ch.FullMode = BoundedChannelFullMode.Wait; // Backpressure: block producer});Registers ChannelTransport as IMessageTransport and ChannelConsumerService as a hosted service. Includes ChannelTransportHealthCheck for health monitoring.
RabbitMQ (Distributed)
Section titled “RabbitMQ (Distributed)”Full-featured RabbitMQ transport with connection pooling, auto-reconnect, topic exchange, manual ack/nack, and prefetch QoS.
msg.UseRabbitMq(rmq =>{ rmq.ConnectionString = "amqp://guest:guest@localhost:5672"; rmq.ConsumerPrefetchCount = 10; // QoS prefetch count rmq.AutoReconnect = true; // Auto-reconnect on connection loss rmq.ReconnectBaseDelayMs = 1000; // Initial reconnect delay rmq.MaxReconnectAttempts = 0; // 0 = infinite rmq.DurableQueues = true; // Survive broker restart rmq.PersistentMessages = true; // Messages survive restart rmq.ExchangeType = "topic"; // Exchange type});Docker compose for development:
services: rabbitmq: image: rabbitmq:3.13-management ports: - "5672:5672" - "15672:15672" # Management UIIncludes RabbitMqHealthCheck for health monitoring.
Kafka (Event Streaming)
Section titled “Kafka (Event Streaming)”Apache Kafka transport for high-throughput event streaming with consumer groups.
msg.UseKafka(k =>{ k.BootstrapServers = "localhost:9092"; k.GroupId = "my-boundary"; // Consumer group k.EnableIdempotence = true; // Exactly-once producer k.EnableAutoCommit = false; // Manual commit after handler k.AutoOffsetReset = "earliest"; // Start from beginning k.MaxPollIntervalMs = 300000; // 5-minute max poll interval k.SessionTimeoutMs = 45000; // Session timeout});Outbox Pattern
Section titled “Outbox Pattern”The transactional outbox ensures that domain events are published if and only if the database transaction commits. No distributed transactions needed.
Entity.RaiseDomainEvent(new OrderCreated(...)) → SaveChangesAsync() → OutboxInterceptor writes to __OutboxMessages (same transaction) → Transaction commits (or rolls back — both atomically) → OutboxDeliveryService polls and delivers to transportConfiguration
Section titled “Configuration”msg.EnableOutbox(outbox =>{ outbox.PollingIntervalSeconds = 5; // How often to poll for pending messages outbox.BatchSize = 100; // Messages per delivery batch outbox.MaxRetries = 3; // Delivery retry attempts});DbContext Setup
Section titled “DbContext Setup”Add [EnableOutbox] to your boundary DbContext:
[EnableOutbox]public class BookingDbContext : PragmaticDbContext { ... }The SG generates:
OutboxSourceimplementation (IOutboxSource)OutboxEntityTypeConfigurationfor the__OutboxMessagestable
EF Core Package
Section titled “EF Core Package”For EF Core-backed stores (outbox source, idempotency, audit):
dotnet add package Pragmatic.Messaging.EFCoreIdempotency
Section titled “Idempotency”Message deduplication at the handler level.
msg.EnableIdempotency();Every handler checks IIdempotencyStore.TryMarkAsProcessedAsync(context.MessageId) before execution. Duplicate messages (same MessageId) are silently skipped.
| Store | Package | Use Case |
|---|---|---|
InMemoryIdempotencyStore | Pragmatic.Messaging.Core | Development (default) |
EfCoreIdempotencyStore | Pragmatic.Messaging.EFCore | Production |
Saga Orchestration
Section titled “Saga Orchestration”Sagas coordinate long-running business processes as a sequence of events and actions with state tracking and compensation.
Defining a Saga
Section titled “Defining a Saga”public enum CheckInState{ Initial, GuestVerified, RoomAssigned, Completed, Cancelled}
[Saga<CheckInState>]public partial class CheckInSaga : ISaga<CheckInState>{ public Guid Id { get; set; } public CheckInState State { get; set; } public string CorrelationId { get; set; } = ""; public DateTimeOffset StartedAt { get; set; } public DateTimeOffset? CompletedAt { get; set; }
// Saga-specific data public string? RoomNumber { get; set; }
[SagaStart] public VerifyGuestAction Handle(GuestArrived @event) { return new VerifyGuestAction(@event.GuestId); }
[InState(CheckInState.GuestVerified, NextState = CheckInState.RoomAssigned)] [CompensateWith<CancelCheckInAction>] public AssignRoomAction Handle(IdentityVerified @event) { return new AssignRoomAction(@event.GuestId); }
[InState(CheckInState.RoomAssigned, NextState = CheckInState.Completed)] public object? Handle(RoomAssigned @event) { RoomNumber = @event.RoomNumber; CompletedAt = DateTimeOffset.UtcNow; return null; // Terminal — no further action }}ISaga<T> Interface
Section titled “ISaga<T> Interface”public interface ISaga<TState> where TState : struct, Enum{ Guid Id { get; set; } TState State { get; set; } string CorrelationId { get; set; } DateTimeOffset StartedAt { get; set; } DateTimeOffset? CompletedAt { get; set; }}Saga Attributes
Section titled “Saga Attributes”| Attribute | Target | Description |
|---|---|---|
[Saga<TState>] | Class | Marks a saga with state enum |
[SagaStart] | Method | Entry point handler (Initial state) |
[InState(state, NextState)] | Method | Handler for a specific state |
[CompensateWith<TAction>] | Method | Action dispatched on failure (rollback) |
[SagaTimeout(Seconds)] | Class | Auto-cancel if not completed within timeout |
SG-Generated Orchestrator
Section titled “SG-Generated Orchestrator”The SG generates {Saga}.Orchestrator.g.cs with:
- State routing:
switchon current state + incoming event type - Compensation chain: on failure, dispatches
[CompensateWith]actions in reverse order - Transition validation: compile-time graph analysis from
[InState]+NextState
Persistence
Section titled “Persistence”msg.EnableSagas(); // InMemory repositories (dev)msg.EnableSagaPersistence(); // EF Core repositories (prod)EF Core persistence uses two tables:
| Table | Purpose |
|---|---|
__SagaInstances | Saga state, correlation, timestamps |
__SagaSteps | Individual step execution history |
Batch Processing
Section titled “Batch Processing”Split large operations into individually tracked items, published via the message bus.
1. Define a Splitter
Section titled “1. Define a Splitter”public record RecalculatePricesCommand(Guid[] ProductIds, string Reason);public record RecalculatePriceBatch(Guid[] ProductIds);
public class PriceRecalculationSplitter : IBatchSplitter<RecalculatePricesCommand, RecalculatePriceBatch>{ public IReadOnlyList<RecalculatePriceBatch> Split(RecalculatePricesCommand command) => command.ProductIds .Chunk(100) .Select(chunk => new RecalculatePriceBatch(chunk)) .ToList();}2. Dispatch
Section titled “2. Dispatch”var batchId = await dispatcher.DispatchAsync( new RecalculatePricesCommand(productIds, "quarterly-review"), label: "Price recalculation Q1");The BatchDispatcher<TBatch, TItem> splits the command, creates a BatchProgress entry, and publishes each item to the bus with batch headers (BatchId, ItemIndex, TotalItems).
3. Report Progress in Handlers
Section titled “3. Report Progress in Handlers”[MessageHandler]public sealed partial class PriceBatchHandler( IBatchProgressStore store) : IMessageHandler<RecalculatePriceBatch>{ public async Task HandleAsync( RecalculatePriceBatch batch, MessageContext context, CancellationToken ct) { try { // Process the batch... await BatchTracker.ReportSuccessAsync(context, store, ct); } catch (Exception) { await BatchTracker.ReportFailureAsync(context, store, ct); throw; } }}4. Query Progress
Section titled “4. Query Progress”var progress = await store.GetProgressAsync(batchId);// progress.Total = 2000// progress.Completed = 1450// progress.Failed = 12// progress.Pending = 538// progress.ProgressPercent = 73.1Configuration
Section titled “Configuration”msg.EnableBatchProcessing(); // InMemory progress storemsg.EnableBatchPersistence(); // EF Core progress storeRequest/Reply
Section titled “Request/Reply”Point-to-point request with a typed response.
Define a Handler
Section titled “Define a Handler”[RequestHandler]public sealed partial class GetPriceHandler : IRequestHandler<GetPriceRequest, PriceResponse>{ public Task<PriceResponse> HandleAsync( GetPriceRequest request, MessageContext context, CancellationToken ct) { return Task.FromResult(new PriceResponse(request.ProductId, 29.99m)); }}var response = await bus.RequestAsync<GetPriceRequest, PriceResponse>( new GetPriceRequest(productId));Request/Reply uses queue semantics (single consumer). For cross-service calls, consider using RemoteBoundary instead, which provides typed HTTP invokers.
Multi-Bus
Section titled “Multi-Bus”Isolate message flows by routing handlers to named buses.
Handler Declaration
Section titled “Handler Declaration”[MessageHandler][OnBus("analytics")]public sealed partial class PageViewedHandler : IMessageHandler<PageViewed>{ public async Task HandleAsync(PageViewed message, MessageContext context, CancellationToken ct) { // Runs on the "analytics" bus, isolated from business-critical traffic }}Bus Configuration
Section titled “Bus Configuration”msg.AddBus("analytics", bus =>{ // Each named bus can have its own transport // bus.UseRabbitMq(rmq => { rmq.ConnectionString = "amqp://analytics-rmq:5672"; });});Handlers without [OnBus] consume from the default bus. Named buses are fully independent with their own transport and routing.
Auditing
Section titled “Auditing”Record lifecycle events for every message.
msg.EnableAuditing(a =>{ a.IncludePayload = false; // Exclude message body from audit entries (default: true)});AuditMiddleware (Order -100) wraps every handler and records:
- Published / Handled / Failed / DeadLettered events
- Duration, handler name, correlation ID, tenant
Query Audit Trail
Section titled “Query Audit Trail”var entries = await auditStore.QueryAsync(new AuditQuery{ CorrelationId = "order-123", FromDate = DateTimeOffset.UtcNow.AddDays(-7)});| Store | Package | Use Case |
|---|---|---|
InMemoryAuditStore | Pragmatic.Messaging.Auditing | Development (default) |
EfCoreAuditStore | Pragmatic.Messaging.EFCore | Production |
Scheduled Messages (Jobs Bridge)
Section titled “Scheduled Messages (Jobs Bridge)”Pragmatic.Messaging.Jobs enables future message delivery backed by Pragmatic.Jobs:
msg.EnableScheduledMessages(); // Requires Pragmatic.Messaging.JobsThis registers IMessageScheduler:
public interface IMessageScheduler{ Task<Guid> ScheduleAsync<T>(T message, TimeSpan delay, CancellationToken ct = default) where T : notnull; Task<Guid> ScheduleAsync<T>(T message, DateTimeOffset scheduledAt, CancellationToken ct = default) where T : notnull; Task CancelAsync(Guid scheduleId, CancellationToken ct = default);}// Deliver in 24 hoursawait scheduler.ScheduleAsync( new CheckoutReminder(reservationId), delay: TimeSpan.FromHours(24));
// Deliver at specific timeawait scheduler.ScheduleAsync( new CheckoutReminder(reservationId), scheduledAt: checkout.AddHours(-2));Internally creates a PublishMessageJob executed by the Jobs scheduler.
Testing
Section titled “Testing”Pragmatic.Messaging.Testing provides MessageBusTestHarness — an in-memory bus that records all published messages without dispatching to handlers.
using Pragmatic.Messaging.Testing;
public class OrderServiceTests{ [Fact] public async Task CompleteCheckout_PublishesOrderCreated() { // Arrange var harness = new MessageBusTestHarness(); var service = new CheckoutService(harness);
// Act await service.CompleteCheckoutAsync(order, CancellationToken.None);
// Assert harness.HasPublished<OrderCreated>().Should().BeTrue(); harness.PublishedOf<OrderCreated>() .Should().ContainSingle(m => m.OrderId == order.Id); }}MessageBusTestHarness API
Section titled “MessageBusTestHarness API”| Method | Description |
|---|---|
Published | All recorded messages |
PublishedOf<T>() | All messages of type T |
HasPublished<T>() | True if any message of type T was published |
HasPublished<T>(predicate) | True if any matching message was published |
Reset() | Clear all recorded messages |
RequestAsync throws InvalidOperationException (records the request but cannot resolve a handler). Use a mock IRequestHandler<TRequest, TResponse> for request/reply testing.
Dead Letter Store
Section titled “Dead Letter Store”Messages that exhaust all retry attempts are sent to the dead letter store for manual investigation.
public interface IDeadLetterStore{ Task StoreAsync(DeadLetterMessage message, CancellationToken ct = default); Task<IReadOnlyList<DeadLetterMessage>> GetAllAsync(CancellationToken ct = default);}| Store | Package | Use Case |
|---|---|---|
InMemoryDeadLetterStore | Pragmatic.Messaging.Core | Development (default) |
To disable the default store (when providing a custom implementation):
msg.DisableInMemoryDeadLetter();Host Integration
Section titled “Host Integration”With Pragmatic.Composition, the SG auto-detects Pragmatic.Messaging via FeatureDetector and generates host registration:
// PragmaticHost.g.cs (auto-generated)services.AddPragmaticMessaging();services.AddPragmaticMessageHandlers();The SG detects additional packages and registers them:
HasMessagingChannels→ ChannelTransportHasMessagingRabbitMq→ RabbitMqTransportHasMessagingAuditing→ AuditMiddleware
Observability
Section titled “Observability”OpenTelemetry
Section titled “OpenTelemetry”ActivitySource: Pragmatic.Messaging
Handler spans include tags for message type, handler name, tenant, correlation ID, and retry count.
Metrics
Section titled “Metrics”| Metric | Type | Description |
|---|---|---|
pragmatic.messaging.published | Counter | Total messages published |
pragmatic.messaging.handler_failures | Counter | Handler failures |
pragmatic.messaging.handler_duration | Histogram (ms) | Handler execution duration |
pragmatic.messaging.retry_attempts | Counter | Total retry attempts |
pragmatic.messaging.circuit_breaker_trips | Counter | Circuit breaker open events |
pragmatic.messaging.idempotency_duplicates | Counter | Duplicate messages skipped |
pragmatic.messaging.dead_lettered | Counter | Messages sent to dead letter |
pragmatic.messaging.outbox_delivery_duration | Histogram (ms) | Outbox delivery batch duration |
Diagnostics
Section titled “Diagnostics”The source generator emits diagnostics for common mistakes:
Handler Diagnostics
Section titled “Handler Diagnostics”| ID | Severity | Message |
|---|---|---|
| PRAG0800 | Error | [MessageHandler] on class not implementing IMessageHandler<T> |
| PRAG0801 | Warning | Handler should be partial |
| PRAG0802 | Error | [Retry] with MaxAttempts <= 0 |
| PRAG0803 | Error | [MessageMiddleware] on class not implementing IMessageMiddleware |
Saga Diagnostics
Section titled “Saga Diagnostics”| ID | Severity | Message |
|---|---|---|
| PRAG0810 | Error | Inconsistent saga state transition |
| PRAG0811 | Info | Saga state has no handler (terminal states OK) |
| PRAG0812 | Warning | Unreachable saga states |
| PRAG0813 | Error | [Saga<T>] where T is not an enum |
| PRAG0814 | Error | Saga without [SagaStart] |
Routing Diagnostics
Section titled “Routing Diagnostics”| ID | Severity | Message |
|---|---|---|
| PRAG0815 | Warning | Cross-boundary handler without [DependsOn] |
| PRAG0816 | Warning | Event type has no consumers |
| PRAG0817 | Error | Queue name collision |
| PRAG0818 | Warning | Cross-boundary handler without outbox |
Outbox Diagnostics
Section titled “Outbox Diagnostics”| ID | Severity | Message |
|---|---|---|
| PRAG0830 | Error | [EnableOutbox] on non-DbContext class |
| PRAG0831 | Error | [EnableOutbox] without Pragmatic.Messaging.EFCore reference |
SG-Generated Output
Section titled “SG-Generated Output”For a project with two handlers and one saga, the SG produces:
obj/GeneratedFiles/Pragmatic.SourceGenerator/├── OrderCreatedHandler.Pipeline.g.cs # Resilience pipeline├── InvoicePaidHandler.Pipeline.g.cs # Resilience pipeline├── CheckInSaga.Orchestrator.g.cs # Saga state routing + compensation├── _Infra.Messaging.Registration.g.cs # DI registration for all handlers├── _Infra.Messaging.TypeRegistry.g.cs # AOT-safe message type map├── _Infra.Messaging.Topology.g.cs # Exchange/queue declarations├── _Infra.Messaging.Routing.g.cs # Routing table└── _Metadata.MessageHandlers.g.cs # Handler metadataDesign Decisions
Section titled “Design Decisions”AOT Safety
Section titled “AOT Safety”- IMessage is independent from IDomainEvent — no coupling between messaging and persistence event systems.
- OutboxInterceptor replaces DomainEventsInterceptor when messaging is detected — the SG makes this a compile-time decision.
- MessageTypeRegistry is a
switchexpression over string names — zeroType.GetType(), zero reflection.
Zero Reflection
Section titled “Zero Reflection”The runtime never scans assemblies, never calls GetType(), never resolves handlers via reflection. The SG generates:
- Direct handler registration (not
services.AddScoped(typeof(IMessageHandler<>))) - Type registry as
switchexpression (notType.GetType(typeName)) - Pipeline as inline code (not middleware chain resolved at runtime)
Anti-Patterns
Section titled “Anti-Patterns”| Avoid | Prefer |
|---|---|
| Runtime handler discovery | [MessageHandler] attribute |
| Polly policies in DI | [Retry], [CircuitBreaker], [Timeout] on handler |
Type.GetType() for deserialization | SG-generated IMessageTypeRegistry |
| Global retry config | Per-handler resilience attributes |
| Saga discovery by reflection | [Saga<TState>] with compile-time graph validation |
| Manual handler registration | SG-generated registration |
Complete Example
Section titled “Complete Example”// Messagespublic record ReservationConfirmed(Guid ReservationId, string GuestEmail, DateTimeOffset CheckIn);public record InvoicePaid(Guid InvoiceId, Guid ReservationId, decimal Amount);
// Handler with full resilience[MessageHandler][Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 200)][CircuitBreaker(FailureThreshold = 5, BreakDurationSeconds = 30)][Timeout(TimeoutSeconds = 60)]public sealed partial class ReservationConfirmedHandler( IEmailService email) : IMessageHandler<ReservationConfirmed>{ public async Task HandleAsync( ReservationConfirmed message, MessageContext context, CancellationToken ct) { await email.SendConfirmationAsync(message.ReservationId, message.GuestEmail, ct); }}
[MessageHandler][Retry(MaxAttempts = 3)]public sealed partial class InvoicePaidHandler( IReservationService reservations) : IMessageHandler<InvoicePaid>{ public async Task HandleAsync( InvoicePaid message, MessageContext context, CancellationToken ct) { await reservations.MarkPaidAsync(message.ReservationId, message.Amount, ct); }}
// Host configurationawait PragmaticApp.RunAsync(args, app =>{ app.UseMessaging(msg => { msg.UseChannels(ch => { ch.Capacity = 1000; ch.ConsumerCount = 2; }); msg.EnableIdempotency(); msg.EnableOutbox(o => { o.PollingIntervalSeconds = 5; o.BatchSize = 100; }); msg.EnableAuditing(); msg.EnableSagas(); msg.EnableBatchProcessing(); });});