Skip to content

Concepts

Architecture, mental model, and design decisions behind Pragmatic.Messaging.

Without a messaging framework, cross-boundary communication requires manual plumbing:

// Without Pragmatic.Messaging — manual event dispatch, no retry, no outbox
public class ReservationService
{
public async Task ConfirmReservation(Guid id)
{
var reservation = await _repo.GetAsync(id);
reservation.Status = ReservationStatus.Confirmed;
await _repo.SaveAsync(reservation);
// Manual event dispatch — no retry, no outbox, no audit
try
{
var @event = new ReservationConfirmed(id, reservation.GuestId, ...);
var handler = _serviceProvider.GetService<ReservationConfirmedHandler>();
await handler.HandleAsync(@event); // What if this fails?
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to notify billing");
// Lost event. Billing never creates invoice. Guest never gets charged.
}
}
}

Problems: no retry, no dead letter, no idempotency, no audit trail, no outbox (event lost if dispatch fails after save), no compile-time validation.

// With Pragmatic.Messaging — attribute-driven, SG-generated pipeline
[MessageHandler]
[Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter)]
public sealed partial class ReservationConfirmedHandler(
IBillingActions billing) : IMessageHandler<ReservationConfirmed>
{
public async Task HandleAsync(ReservationConfirmed @event, MessageContext context, CancellationToken ct)
{
await billing.CreateDraftInvoice(@event.ReservationId, @event.GuestId, ...);
}
}

The SG generates a _Pipeline wrapper with retry loop, circuit breaker, idempotency check, telemetry, and logging. The outbox ensures the event is never lost. Zero reflection at runtime.

Entity raises DomainEvent
OutboxInterceptor (SaveChanges) → persists OutboxMessage atomically
OutboxDeliveryService (background) → polls pending messages
IMessageTypeRegistry (SG-generated) → deserializes to typed message (AOT-safe)
IMessageBus.DispatchAsync() → resolves handler(s)
Handler_Pipeline (SG-generated):
├── Idempotency check (IIdempotencyStore)
├── Circuit breaker gate (static state)
├── Retry loop (backoff strategy)
│ ├── Authorization bypass (ICallContext)
│ └── Handler.HandleAsync() (your code)
├── Metrics (duration, success/failure)
└── Logging ([LoggerMessage])

For each [MessageHandler] class, the SG generates {Handler}_Pipeline.g.cs:

LayerWhen GeneratedPurpose
IdempotencyAlways (null-safe)Skips duplicate MessageIds
Circuit Breaker[CircuitBreaker] presentRejects when failure threshold exceeded
Retry[Retry] presentRetries with configurable backoff
Timeout[Timeout] presentLinked CancellationToken
Auth BypassAuthorization detectedICallContext.EnterInternalCall()
TelemetryAlwaysActivity span + metrics counters
LoggingAlways[LoggerMessage] partials
IMessageBus
┌──────────────┼──────────────┐
▼ ▼ ▼
InMemoryBus ChannelTransport RabbitMqTransport
(dev/test) (in-process) (distributed)
│ │ │
└──────────────┼──────────────┘
IMessageRouter (SG-generated)
├── GetTopic<T>() → exchange name
├── GetQueue() → queue name
└── GetBusName() → named bus
ScenarioTransportOutboxPattern
Unit testsInMemoryNoDirect dispatch
Monolith, async decouplingChannelsOptionalBackground processing
Cross-service eventsRabbitMQYesOutbox + delivery
Event streaming, analyticsKafka (P3)YesTopic partitions
Multi-step workflowAnyYesSaga orchestration
Large data processingAnyNoBatch split + dispatch
TriggerGenerated FileContent
[MessageHandler]{Handler}.Pipeline.g.csRetry, CB, idempotency, telemetry wrapper
Aggregate_Infra.Messaging.Registration.g.csAddPragmaticMessageHandlers() DI extension
Aggregate_Infra.Messaging.TypeRegistry.g.csAOT-safe message type → deserializer map
Aggregate_Infra.Messaging.Metadata.g.csAssembly metadata for host composition
Transport ref_Infra.Messaging.Topology.g.csExchange/queue bindings
Transport ref_Infra.Messaging.Routing.g.csMessage type → topic/queue router
[EnableOutbox]_Infra.Outbox.{Boundary}.g.csOutboxSource + health check
[Saga<TState>]{Saga}.Orchestrator.g.csState machine router + compensation
Aggregate_Infra.Messaging.SagaRegistration.g.csSaga DI registration
// Program.cs — transport + features
await PragmaticApp.RunAsync(args, app =>
{
app.UseMessaging(msg =>
{
msg.UseChannels(ch => { ch.Capacity = 1000; });
msg.EnableOutbox();
msg.EnableIdempotency();
msg.EnableSagas();
msg.EnableBatchProcessing();
msg.EnableAuditing(a => a.IncludePayload = false);
});
});

The SG auto-registers handlers, type registry, routing — no manual DI needed.

builder.Services.AddPragmaticMessaging(msg =>
{
msg.UseChannels();
});
// SG-generated extension:
builder.Services.AddPragmaticMessageHandlers();

These are separate modules with different scopes:

Pragmatic.EventsPragmatic.Messaging
ScopeIn-process, same boundaryCross-boundary, cross-service, async
HandlerIDomainEventHandler<T>IMessageHandler<T>
DispatchInMemoryEventDispatcher (sync, same TX)IMessageBus (async, outbox, transport)
SGComposition featureMessaging feature (pipeline, resilience)

Bridge: The SG generates MessageHandlerEventAdapter<T> so domain events can also reach IMessageHandler<T>. This is optional and automatic when both packages are referenced.

ModuleIntegration
EventsOptional bridge: domain events reach message handlers via adapter
ActionsSaga steps dispatch DomainAction via IMessageBus
Persistence.EFCoreOutboxInterceptor on SaveChanges, [EnableOutbox] on DbContext
AuthorizationPipeline generates ICallContext.EnterInternalCall() bypass
MultiTenancyMessageContext.TenantId propagated to Activity tags
CompositionPragmaticHostTemplate auto-calls AddPragmaticMessageHandlers()