Architecture and Core Concepts
This guide explains why Pragmatic.Jobs exists, how its pieces fit together, and how to choose the right abstraction for each situation. Read this before diving into the individual feature guides.
The Problem
Section titled “The Problem”Background job processing in .NET typically means pulling in Hangfire or Quartz.NET. Both are capable, but both rely on runtime reflection for job discovery, serialization, and invocation. They require separate dashboard packages, separate storage providers, and separate retry configuration that lives outside your job definition.
The Hangfire approach: config is separate from the job
Section titled “The Hangfire approach: config is separate from the job”// Job class -- knows nothing about how it will be scheduledpublic class DailyReportJob{ private readonly IReportService _reports;
public DailyReportJob(IReportService reports) => _reports = reports;
public async Task Execute() => await _reports.GenerateDailyAsync(DateTime.UtcNow.Date);}
// Somewhere else -- scheduling config is disconnected from the jobRecurringJob.AddOrUpdate<DailyReportJob>( "daily-report", x => x.Execute(), // Expression tree -> reflection at runtime "0 2 * * *", new RecurringJobOptions { TimeZone = TimeZoneInfo.FindSystemTimeZoneById("Europe/Rome") });
// Yet another place -- retry is global or per-filterGlobalJobFilters.Filters.Add(new AutomaticRetryAttribute { Attempts = 3 });Problems with this approach:
-
Configuration is scattered. The job class, the schedule registration, and the retry policy all live in different places. To understand what a job does, you need to look in three locations.
-
Reflection-heavy.
RecurringJob.AddOrUpdateuses expression trees that are evaluated at runtime.Type.GetType()is used for deserialization. Neither is AOT-safe. -
No compile-time validation. If you typo a cron expression, rename a job class, or create a cycle between continuations, you find out at runtime — or not at all.
-
Retry logic is external. Different jobs need different retry strategies, but configuring per-job retry requires filter attributes evaluated at runtime.
-
No distributed locking out of the box. You need additional configuration for multi-instance deployments.
The Solution
Section titled “The Solution”With Pragmatic.Jobs, you declare scheduling, retry, and timeout directly on the job class. The source generator reads these attributes at compile time and produces an invoker with inline retry loops, linked cancellation tokens, telemetry, and DI wiring — zero reflection.
[RecurringJob("0 2 * * *", Id = "daily-report")][Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 1000)][Timeout(TimeoutSeconds = 600)]public sealed partial class DailyReportJob(IReportService reports) : IJob{ public async Task ExecuteAsync(JobContext context, CancellationToken ct) => await reports.GenerateDailyAsync(context.ScheduledAt.Date, ct);}That is the entire job definition. Everything a developer needs to know about how this job is scheduled, retried, and timed out is right here in the class declaration.
The source generator produces:
| Generated File | Purpose |
|---|---|
DailyReportJob.Invoker.g.cs | Retry loop + timeout + telemetry wrapper |
_Infra.Jobs.Registration.g.cs | DI registration for all discovered jobs |
_Infra.Jobs.TypeRegistry.g.cs | AOT-safe switch expression (no Type.GetType) |
_Infra.Jobs.RecurringJobs.g.cs | Recurring job definitions with cron expressions |
_Metadata.Jobs.g.cs | Job metadata for host aggregation |
What you gain
Section titled “What you gain”- Single source of truth. Schedule, retry, timeout, and continuation are all declared on the job class.
- Compile-time validation. Invalid cron expressions (PRAG2501), duplicate recurring IDs (PRAG2503), continuation cycles (PRAG2506) — all caught at build time.
- AOT-safe. The generated
IJobTypeRegistryuses aswitchexpression on string FQNs. NoType.GetType(), no reflection. - Distributed-safe. Lease-based locking works out of the box with EF Core persistence.
- Observable. Every execution creates an OpenTelemetry span with job type, attempt, and correlation ID. Metrics for enqueued, completed, failed, retried, and duration are built in.
How It Works
Section titled “How It Works”The job processing pipeline has four stages: Definition, Generation, Scheduling, and Execution.
Stage 1: Definition
Section titled “Stage 1: Definition”You write a class that implements IJob (parameterless) or IJob<T> (typed parameters) and decorate it with [Job] or [RecurringJob]:
[Job][Retry(MaxAttempts = 3)]public sealed partial class SendReminderJob : IJob<ReminderParams>{ public async Task ExecuteAsync(ReminderParams p, JobContext context, CancellationToken ct) { // Your business logic }}Stage 2: Generation (Compile Time)
Section titled “Stage 2: Generation (Compile Time)”The source generator (JobsFeature in Pragmatic.SourceGenerator) processes every class with [Job] or [RecurringJob]:
- Transform:
JobTransformextracts attribute data into an immutableJobModelrecord. - Validate:
JobsFeature.ValidateJobsruns aggregate diagnostics (duplicate IDs, continuation cycles). - Template:
JobInvokerTemplategenerates the per-job invoker.JobRegistrationTemplate,JobTypeRegistryTemplate,RecurringJobRegistrationTemplate, andJobMetadataTemplategenerate infrastructure files.
Stage 3: Scheduling (Runtime)
Section titled “Stage 3: Scheduling (Runtime)”Jobs enter the system in two ways:
- Recurring:
RecurringJobSchedulerServicepollsIRecurringJobStoreevery N seconds. When a recurring definition is due (based on cron evaluation in the configured timezone), it enqueues aJobInstanceintoIJobStore. - On-demand: Your code calls
IJobScheduler.ScheduleAsync<TJob>(...)which creates aJobInstancewith the appropriateScheduledFortimestamp.
Stage 4: Execution (Runtime)
Section titled “Stage 4: Execution (Runtime)”JobProcessorService is a BackgroundService that:
- Releases expired leases from crashed workers.
- Polls
IJobStore.GetPendingAsync()for jobs due now. - Acquires a lease (optimistic lock) on each job.
- Resolves the job from DI via
IJobTypeRegistry.ExecuteAsync(). - Executes the generated invoker (which wraps your
ExecuteAsyncwith retry, timeout, and telemetry). - Marks the job as completed or failed.
- If completed and a continuation is declared, enqueues the next job.
[RecurringJob] ──> SG ──> RecurringJobSchedulerService ──> IJobStore │[Job] ──> IJobScheduler.ScheduleAsync() ──────────────────────┘ │ v JobProcessorService │ ┌───────────────┼───────────────┐ v v v Worker 1 Worker 2 Worker N │ │ │ TryAcquireLease TryAcquireLease TryAcquireLease │ │ │ IJobTypeRegistry.ExecuteAsync() │ Generated Invoker (retry + timeout + telemetry) │ Your ExecuteAsync()Decision Tree: Which Job Type?
Section titled “Decision Tree: Which Job Type?”IJob vs IJob
Section titled “IJob vs IJob”| Scenario | Interface | Example |
|---|---|---|
| No input data needed | IJob | Daily report, session cleanup, health check |
| Needs input data | IJob<T> | Send email to specific guest, generate invoice for reservation |
// Parameterless -- used for recurring tasks that derive context from the schedulepublic sealed partial class CleanupExpiredSessionsJob : IJob{ public async Task ExecuteAsync(JobContext context, CancellationToken ct) { var cutoff = context.ScheduledAt.AddDays(-30); // ... }}
// Typed parameters -- used for on-demand jobs that need specific inputpublic record ReminderParams(Guid ReservationId, Guid GuestId, string GuestEmail);
public sealed partial class SendCheckInReminderJob : IJob<ReminderParams>{ public async Task ExecuteAsync(ReminderParams p, JobContext context, CancellationToken ct) { // Send email to p.GuestEmail }}Recurring vs One-Off vs Delayed
Section titled “Recurring vs One-Off vs Delayed”| Scenario | Attribute | Scheduling |
|---|---|---|
| Fixed schedule (cron) | [RecurringJob("0 2 * * *")] | Automatic via RecurringJobSchedulerService |
| Fire-and-forget (now) | [Job] | scheduler.ScheduleAsync<T>() |
| Delayed (in N hours) | [Job] | scheduler.ScheduleAsync<T>(delay: TimeSpan.FromHours(24)) |
| At specific time | [Job] | scheduler.ScheduleAtAsync<T>(scheduledFor: ...) |
Rule of thumb: If the job runs on a calendar-based schedule, use [RecurringJob]. If the job is triggered by a business event (reservation confirmed, payment received), use [Job] and schedule it programmatically.
When to Use Continuations
Section titled “When to Use Continuations”Continuations are for sequential job chains where the second job should only run if the first succeeds:
[Job][Continuation<SendInvoiceEmailJob>]public sealed partial class GenerateInvoiceJob : IJob<InvoiceParams>{ public async Task ExecuteAsync(InvoiceParams p, JobContext context, CancellationToken ct) { // Generate invoice -- if this fails, the email is NOT sent }}
[Job][Retry(MaxAttempts = 2)]public sealed partial class SendInvoiceEmailJob : IJob{ public async Task ExecuteAsync(JobContext context, CancellationToken ct) { // Send email using context.CorrelationId to find the invoice }}The SG validates continuation chains at compile time:
PRAG2505: Continuation type must implementIJoborIJob<T>.PRAG2506: No cycles allowed (A->B->A or A->B->C->A are compile errors).
JobContext
Section titled “JobContext”Every job execution receives an immutable JobContext record with metadata about the current invocation:
public sealed record JobContext( Guid JobId, // Unique job instance ID string JobType, // FQN of the job class DateTimeOffset ScheduledAt, // When the job was scheduled to run int Attempt, // Current attempt number (0 = first) int MaxAttempts, // Max retry attempts configured string? CorrelationId, // Optional correlation for tracing string? TenantId); // Optional tenant for multi-tenancyUse JobContext for:
- Logging: Include
JobId,Attempt, andScheduledAtin log messages. - Correlation: Use
CorrelationIdto link related operations across job chains. - Idempotency: Use
JobIdto ensure at-most-once execution in external systems. - Multi-tenancy: Use
TenantIdto scope database queries.
Registration Model
Section titled “Registration Model”With Pragmatic.Composition (Recommended)
Section titled “With Pragmatic.Composition (Recommended)”When using Pragmatic.Composition, the SG auto-detects Pragmatic.Jobs via FeatureDetector and generates host registration in PragmaticHost.g.cs:
// Auto-generated -- no manual registration neededservices.AddPragmaticJobs();services.AddJobProcessingServices();Configure options via UseJobs():
await PragmaticApp.RunAsync(args, app =>{ app.UseJobs(jobs => { jobs.WithWorkerCount(4); jobs.WithPollingInterval(5); jobs.UseEfCore(); });});Standalone (Without Composition)
Section titled “Standalone (Without Composition)”var builder = WebApplication.CreateBuilder(args);
builder.Services.AddPragmaticJobs(jobs =>{ jobs.WithWorkerCount(2); jobs.WithPollingInterval(10);});
// Register processing services manuallybuilder.Services.AddJobProcessingServices();Configuration Options
Section titled “Configuration Options”| Option | Default | Description |
|---|---|---|
WorkerCount | 2 | Number of concurrent job processing tasks |
PollingIntervalSeconds | 5 | Seconds between polling for pending jobs |
LeaseTimeSeconds | 300 | Lease duration for distributed locking (5 min) |
BatchSize | 10 | Maximum pending jobs to fetch per poll cycle |
DefaultMaxRetries | 1 | Default max retries for jobs without [Retry] |
WorkerId | auto-generated | Worker ID for lease acquisition |
UseEfCore | false | Enable EF Core persistence |
What Gets Generated
Section titled “What Gets Generated”For a project with two jobs — a recurring DailyReportJob and an on-demand SendReminderJob — the SG produces five files:
Per-Job: Invoker
Section titled “Per-Job: Invoker”DailyReportJob.Invoker.g.cs wraps your ExecuteAsync with the configured resilience pipeline:
// Simplified illustrationpartial class DailyReportJob{ internal sealed class Invoker { public async Task InvokeAsync(JobContext context, CancellationToken ct) { // Timeout: linked CancellationToken using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); cts.CancelAfter(TimeSpan.FromSeconds(600));
// Retry loop (inline, no Polly dependency) for (var attempt = 0; attempt <= 3; attempt++) { try { using var activity = JobsDiagnostics.ActivitySource.StartActivity("DailyReportJob"); var sw = Stopwatch.StartNew();
await job.ExecuteAsync(context with { Attempt = attempt }, cts.Token);
JobsDiagnostics.JobDuration.Record(sw.ElapsedMilliseconds); JobsDiagnostics.JobsCompleted.Add(1); return; } catch (Exception) when (attempt < 3) { JobsDiagnostics.JobsRetried.Add(1); var delay = CalculateDelay(attempt, 1000, BackoffStrategy.ExponentialWithJitter); await Task.Delay(delay, ct); } } } }}Key details:
- The retry loop is fully inlined — no Polly, no runtime reflection.
[Timeout]generates a linkedCancellationTokenSourcewithCancelAfter.[Retry]generates the loop with the configuredBackoffStrategyandBaseDelayMs.- OpenTelemetry spans and metrics are always emitted.
Aggregate: Registration
Section titled “Aggregate: Registration”_Infra.Jobs.Registration.g.cs registers all discovered jobs and their invokers in DI:
public static IServiceCollection AddPragmaticJobs(this IServiceCollection services){ services.AddTransient<DailyReportJob>(); services.AddTransient<DailyReportJob.Invoker>(); services.AddTransient<SendReminderJob>(); services.AddTransient<SendReminderJob.Invoker>(); return services;}Aggregate: Type Registry
Section titled “Aggregate: Type Registry”_Infra.Jobs.TypeRegistry.g.cs maps job FQN strings to factories — an AOT-safe switch expression that replaces Type.GetType():
public sealed class PragmaticJobTypeRegistry : IJobTypeRegistry{ public async Task ExecuteAsync(string jobTypeFqn, string? parametersJson, JobContext context, IServiceProvider sp, CancellationToken ct) { switch (jobTypeFqn) { case "MyApp.Jobs.DailyReportJob": var job1 = sp.GetRequiredService<DailyReportJob.Invoker>(); await job1.InvokeAsync(context, ct); break; case "MyApp.Jobs.SendReminderJob": var job2 = sp.GetRequiredService<SendReminderJob.Invoker>(); await job2.InvokeAsync(parametersJson, context, ct); break; default: throw new InvalidOperationException($"Unknown job: {jobTypeFqn}"); } }}Aggregate: Recurring Jobs
Section titled “Aggregate: Recurring Jobs”_Infra.Jobs.RecurringJobs.g.cs registers recurring definitions at startup:
public static void RegisterRecurringJobs(IRecurringJobStore store){ store.UpsertAsync(new RecurringJobDefinition { Id = "daily-report", JobType = "MyApp.Jobs.DailyReportJob", CronExpression = "0 2 * * *", TimeZoneId = null, // UTC IsEnabled = true });}Aggregate: Metadata
Section titled “Aggregate: Metadata”_Metadata.Jobs.g.cs exposes job metadata for host aggregation and tooling.
Attributes Reference
Section titled “Attributes Reference”Marks a class as a background job. Required for SG discovery on non-recurring jobs.
[AttributeUsage(AttributeTargets.Class, Inherited = false)]public sealed class JobAttribute : Attribute;[RecurringJob]
Section titled “[RecurringJob]”Marks a class as a recurring job with cron scheduling.
| Property | Type | Default | Description |
|---|---|---|---|
CronExpression | string | (required) | 5-part cron expression |
Id | string? | auto-generated (kebab-case) | Unique recurring job ID |
TimeZone | string? | UTC | IANA timezone for cron evaluation |
[Retry]
Section titled “[Retry]”Configures retry behavior. The SG generates an inline retry loop in the invoker.
| Property | Type | Default | Description |
|---|---|---|---|
MaxAttempts | int | 3 | Maximum retry attempts |
Strategy | BackoffStrategy | Exponential | Fixed, Exponential, ExponentialWithJitter |
BaseDelayMs | int | 1000 | Base delay between retries (ms) |
Backoff formulas:
| Strategy | Formula |
|---|---|
Fixed | baseDelayMs |
Exponential | baseDelayMs * 2^attempt |
ExponentialWithJitter | baseDelayMs * 2^attempt + Random(0, baseDelayMs) |
[Timeout]
Section titled “[Timeout]”Sets an execution timeout per attempt.
| Property | Type | Default | Description |
|---|---|---|---|
TimeoutSeconds | int | 300 | Timeout per execution attempt (seconds) |
[Continuation]
Section titled “[Continuation]”Declares automatic job chaining on successful completion. The next job is enqueued with the same CorrelationId.
[Continuation<SendInvoiceEmailJob>]public sealed partial class GenerateInvoiceJob : IJob<InvoiceParams> { ... }Persistence Stores
Section titled “Persistence Stores”| Store | Package | Use Case |
|---|---|---|
InMemoryJobStore | Pragmatic.Jobs | Development and testing (default) |
InMemoryRecurringJobStore | Pragmatic.Jobs | Development and testing (default) |
EfCoreJobStore | Pragmatic.Jobs.EFCore | Production with database persistence |
EfCoreRecurringJobStore | Pragmatic.Jobs.EFCore | Production with database persistence |
Distributed Locking
Section titled “Distributed Locking”Multiple app instances can process jobs safely. The lease-based locking uses optimistic SQL:
UPDATE __JobsSET LeasedBy = @worker, LeaseExpiresAt = @leaseWHERE Id = @id AND (LeasedBy IS NULL OR LeaseExpiresAt < @now)Only one worker acquires each job. If a worker crashes, the lease expires after LeaseTimeSeconds and another worker picks up the job.
Database Tables
Section titled “Database Tables”| Table | Purpose |
|---|---|
__Jobs | Job instances (pending, running, completed, failed, cancelled) |
__RecurringJobs | Recurring job definitions and next-run tracking |
Background Services
Section titled “Background Services”Two hosted services drive the job infrastructure:
| Service | Responsibility |
|---|---|
JobProcessorService | Polls IJobStore for pending jobs, acquires leases, invokes via IJobTypeRegistry |
RecurringJobSchedulerService | Evaluates cron expressions against IRecurringJobStore, enqueues when due |
Both are registered by AddJobProcessingServices() (or automatically by the SG in Composition mode). Both skip execution if IJobTypeRegistry is NullJobTypeRegistry (meaning no jobs were discovered).
Observability
Section titled “Observability”OpenTelemetry
Section titled “OpenTelemetry”ActivitySource: Pragmatic.Jobs
Every job execution creates a span with tags: job.id, job.type, job.attempt.
Metrics
Section titled “Metrics”| Metric | Type | Description |
|---|---|---|
pragmatic.jobs.enqueued | Counter | Total jobs enqueued |
pragmatic.jobs.completed | Counter | Total jobs completed successfully |
pragmatic.jobs.failed | Counter | Total jobs that failed permanently |
pragmatic.jobs.retried | Counter | Total retry attempts |
pragmatic.jobs.duration | Histogram (ms) | Job execution duration |
pragmatic.jobs.lease_acquisitions | Counter | Total lease acquisitions |
pragmatic.jobs.lease_conflicts | Counter | Lease conflicts (another worker won) |
pragmatic.jobs.recurring_triggered | Counter | Recurring job triggers |
Ecosystem Integration
Section titled “Ecosystem Integration”Messaging.Jobs Bridge
Section titled “Messaging.Jobs Bridge”Pragmatic.Messaging.Jobs bridges the two systems, enabling scheduled message delivery:
app.UseMessaging(msg =>{ msg.EnableScheduledMessages(); // Requires Pragmatic.Messaging.Jobs});
// Schedule a message for future deliveryvar scheduleId = await messageScheduler.ScheduleAsync( new CheckoutReminder(reservationId), delay: TimeSpan.FromHours(24));Internally, this creates a PublishMessageJob that publishes the message to the bus when the scheduled time arrives.
Showcase Examples
Section titled “Showcase Examples”The Showcase application demonstrates both job types:
NoShowDetectionJob(examples/showcase/src/Showcase.Booking/Infrastructure/Jobs/) — recurring hourly job that detects no-show reservations.SendCheckInReminderJob(examples/showcase/src/Showcase.Booking/Infrastructure/Jobs/) — delayed job with typed parameters, scheduled 24h before check-in.
Packages
Section titled “Packages”| Package | Description |
|---|---|
Pragmatic.Jobs | Core: IJob, IJob<T>, IJobScheduler, InMemory stores, background services, attributes |
Pragmatic.Jobs.EFCore | EF Core persistence: EfCoreJobStore, EfCoreRecurringJobStore, entity config |