Pragmatic.Jobs
AOT-safe background job scheduling for .NET 10. Recurring cron jobs, delayed fire-and-forget, continuation chains, distributed locking — all source-generated at compile time.
The Problem
Section titled “The Problem”Background job processing in .NET typically means pulling in Hangfire or Quartz.NET. Both are capable, but both rely on runtime reflection for job discovery, serialization, and invocation. They require separate dashboard packages, separate storage providers, and separate retry configuration that lives outside your job definition. When a job needs retry with exponential backoff, you configure it in a fluent API on one side and write the job logic on the other — the two are disconnected.
// Without Pragmatic: Hangfire-style — config is separate from the jobRecurringJob.AddOrUpdate<DailyReportJob>( "daily-report", x => x.Execute(), // Expression tree → reflection "0 2 * * *", new RecurringJobOptions { TimeZone = TimeZoneInfo.FindSystemTimeZoneById("Europe/Rome") });
// Retry is configured globally or per-filter, not on the job itselfGlobalJobFilters.Filters.Add(new AutomaticRetryAttribute { Attempts = 3 });The job class has no idea how it will be scheduled, retried, or timed out. If you want different retry strategies for different jobs, you need per-job filter attributes that are evaluated at runtime.
The Solution
Section titled “The Solution”With Pragmatic.Jobs, you declare scheduling, retry, and timeout directly on the job class. The source generator produces the invoker with inline retry loops, linked cancellation tokens, telemetry, and DI wiring — all at compile time, zero reflection.
[RecurringJob("0 2 * * *", Id = "daily-report")][Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 1000)][Timeout(TimeoutSeconds = 600)]public sealed partial class DailyReportJob(IReportService reports) : IJob{ public async Task ExecuteAsync(JobContext context, CancellationToken ct) => await reports.GenerateDailyAsync(context.ScheduledAt.Date, ct);}That is the entire job definition. The SG generates:
DailyReportJob.Invoker.g.cswith retry loop + timeout + telemetry- Registration in
_Infra.Jobs.Registration.g.cs - Type entry in
_Infra.Jobs.TypeRegistry.g.cs(AOT-safe, noType.GetType) - Recurring definition in
_Infra.Jobs.RecurringJobs.g.cs - Metadata in
_Metadata.Jobs.g.cs
Installation
Section titled “Installation”dotnet add package Pragmatic.Jobsdotnet add package Pragmatic.Jobs.EFCore # For production persistenceAdd the source generator:
<ProjectReference Include="..\Pragmatic.SourceGenerator\src\Pragmatic.SourceGenerator\Pragmatic.SourceGenerator.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" />Quick Start
Section titled “Quick Start”1. Define a job
Section titled “1. Define a job”using Pragmatic.Jobs;using Pragmatic.Jobs.Attributes;
[RecurringJob("0 2 * * *", Id = "daily-report")][Retry(MaxAttempts = 3)]public sealed partial class DailyReportJob(IReportService reports) : IJob{ public async Task ExecuteAsync(JobContext context, CancellationToken ct) => await reports.GenerateDailyAsync(context.ScheduledAt.Date, ct);}2. Configure the host
Section titled “2. Configure the host”await PragmaticApp.RunAsync(args, app =>{ app.UseJobs(jobs => { jobs.WithWorkerCount(4); jobs.WithPollingInterval(5); jobs.UseEfCore(); });});3. Schedule on demand (optional)
Section titled “3. Schedule on demand (optional)”var jobId = await scheduler.ScheduleAsync<SendReminderJob, ReminderParams>( new ReminderParams(reservationId, "guest@hotel.com"), delay: TimeSpan.FromHours(24));The SG auto-discovers jobs by [Job] and [RecurringJob] attributes. No manual registration needed.
Job Types
Section titled “Job Types”IJob — Parameterless Jobs
Section titled “IJob — Parameterless Jobs”For jobs that need no input parameters. Recurring cleanup, report generation, health checks.
public interface IJob{ Task ExecuteAsync(JobContext context, CancellationToken ct);}[RecurringJob("0 0 * * 0", Id = "cleanup-sessions", TimeZone = "Europe/Rome")]public sealed partial class CleanupExpiredSessionsJob(AppDbContext db) : IJob{ public async Task ExecuteAsync(JobContext context, CancellationToken ct) { var cutoff = context.ScheduledAt.AddDays(-30); await db.Sessions.Where(s => s.ExpiresAt < cutoff).ExecuteDeleteAsync(ct); }}IJob<T> — Typed Parameter Jobs
Section titled “IJob<T> — Typed Parameter Jobs”For jobs that require input data. The parameter type must be JSON-serializable.
public interface IJob<in TParams> where TParams : notnull{ Task ExecuteAsync(TParams parameters, JobContext context, CancellationToken ct);}public record InvoiceParams(Guid ReservationId, Guid GuestId, decimal Amount, string Currency);
[Job][Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 2000)][Timeout(TimeoutSeconds = 120)]public sealed partial class GenerateInvoiceJob(IBillingService billing) : IJob<InvoiceParams>{ public async Task ExecuteAsync(InvoiceParams p, JobContext context, CancellationToken ct) => await billing.GenerateAsync(p.ReservationId, p.Amount, p.Currency, ct);}Recurring Jobs (Cron)
Section titled “Recurring Jobs (Cron)”Recurring jobs use standard 5-part cron expressions and run on a fixed schedule.
[RecurringJob("0 2 * * *", Id = "daily-report")]// │ │ │ │ │// │ │ │ │ └── Day of week (0-6, Sun=0)// │ │ │ └──── Month (1-12)// │ │ └────── Day of month (1-31)// │ └──────── Hour (0-23)// └────────── Minute (0-59)| Expression | Schedule |
|---|---|
0 2 * * * | Daily at 2:00 AM |
0 0 * * 0 | Every Sunday at midnight |
*/15 * * * * | Every 15 minutes |
0 9 1 * * | First day of every month at 9:00 AM |
0 0 * * 1-5 | Every weekday at midnight |
Timezone support: The TimeZone property accepts IANA timezone names.
[RecurringJob("0 0 * * 0", Id = "cleanup", TimeZone = "Europe/Rome")]public sealed partial class WeeklyCleanupJob : IJob { ... }Auto-generated ID: If Id is not specified, the SG generates a kebab-cased ID from the class name (DailyReportJob becomes daily-report).
Delayed / Fire-and-Forget Jobs
Section titled “Delayed / Fire-and-Forget Jobs”Jobs decorated with [Job] (not [RecurringJob]) are scheduled programmatically.
[Job]public sealed partial class SendReminderJob : IJob<ReminderParams> { ... }
// Fire-and-forget (immediate)var id = await scheduler.ScheduleAsync<SendReminderJob, ReminderParams>( new ReminderParams(reservationId, email));
// Delayed (runs in 24 hours)var id = await scheduler.ScheduleAsync<SendReminderJob, ReminderParams>( new ReminderParams(reservationId, email), delay: TimeSpan.FromHours(24));
// At specific timevar id = await scheduler.ScheduleAtAsync<SendReminderJob, ReminderParams>( new ReminderParams(reservationId, email), scheduledFor: checkout.AddHours(-2));Continuation Chains
Section titled “Continuation Chains”Jobs can declare automatic continuation: when job A completes, job B is enqueued.
[Job][Continuation<SendInvoiceEmailJob>]public sealed partial class GenerateInvoiceJob : IJob<InvoiceParams>{ public async Task ExecuteAsync(InvoiceParams p, JobContext context, CancellationToken ct) { // Generate invoice... } // On success, SendInvoiceEmailJob is automatically enqueued // with the same CorrelationId for tracing.}
[Job][Retry(MaxAttempts = 2)]public sealed partial class SendInvoiceEmailJob : IJob{ public async Task ExecuteAsync(JobContext context, CancellationToken ct) { // Send email using context.CorrelationId to find the invoice }}The SG validates the chain at compile time:
- The continuation type must implement
IJoborIJob<T>(PRAG2505) - No cycles allowed: A->B->A or A->B->C->A are compile errors (PRAG2506)
- Chain depth is checked via DAG traversal
Attributes Reference
Section titled “Attributes Reference”Marks a class as a background job. Required for SG discovery on non-recurring jobs.
[AttributeUsage(AttributeTargets.Class, Inherited = false)]public sealed class JobAttribute : Attribute;[RecurringJob]
Section titled “[RecurringJob]”Marks a class as a recurring job with cron scheduling.
[RecurringJob("0 2 * * *", Id = "daily-report", TimeZone = "Europe/Rome")]| Property | Type | Default | Description |
|---|---|---|---|
CronExpression | string | (required) | 5-part cron expression |
Id | string? | auto-generated | Unique recurring job ID |
TimeZone | string? | UTC | IANA timezone for cron evaluation |
[Retry]
Section titled “[Retry]”Configures retry behavior. The SG generates an inline retry loop in the invoker.
[Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 1000)]| Property | Type | Default | Description |
|---|---|---|---|
MaxAttempts | int | 3 | Maximum retry attempts |
Strategy | BackoffStrategy | Exponential | Fixed, Exponential, ExponentialWithJitter |
BaseDelayMs | int | 1000 | Base delay between retries (ms) |
Backoff strategies:
| Strategy | Formula |
|---|---|
Fixed | baseDelayMs |
Exponential | baseDelayMs * 2^attempt |
ExponentialWithJitter | baseDelayMs * 2^attempt + Random(0, baseDelayMs) |
[Timeout]
Section titled “[Timeout]”Sets an execution timeout. The SG generates a linked CancellationToken in the invoker.
[Timeout(TimeoutSeconds = 600)]| Property | Type | Default | Description |
|---|---|---|---|
TimeoutSeconds | int | 300 | Timeout per execution attempt (seconds) |
[Continuation<TNextJob>]
Section titled “[Continuation<TNextJob>]”Declares automatic job chaining on successful completion.
[Continuation<SendInvoiceEmailJob>]The next job is enqueued immediately after the current job completes. The CorrelationId is carried forward.
IJobScheduler API
Section titled “IJobScheduler API”The IJobScheduler interface is the primary API for scheduling on-demand jobs.
public interface IJobScheduler{ // Parameterless — immediate, delayed, or at specific time Task<Guid> ScheduleAsync<TJob>( TimeSpan? delay = null, string? correlationId = null, CancellationToken ct = default) where TJob : IJob;
Task<Guid> ScheduleAtAsync<TJob>( DateTimeOffset scheduledFor, string? correlationId = null, CancellationToken ct = default) where TJob : IJob;
// With parameters Task<Guid> ScheduleAsync<TJob, TParams>( TParams parameters, TimeSpan? delay = null, string? correlationId = null, CancellationToken ct = default) where TJob : IJob<TParams> where TParams : notnull;
Task<Guid> ScheduleAtAsync<TJob, TParams>( TParams parameters, DateTimeOffset scheduledFor, string? correlationId = null, CancellationToken ct = default) where TJob : IJob<TParams> where TParams : notnull;
// Cancel Task CancelAsync(Guid jobId, CancellationToken ct = default);}Scheduling Patterns
Section titled “Scheduling Patterns”// Fire-and-forgetawait scheduler.ScheduleAsync<CleanupJob>();
// Delayedawait scheduler.ScheduleAsync<SendReminderJob, ReminderParams>( new(reservationId, email), delay: TimeSpan.FromHours(24));
// At specific timeawait scheduler.ScheduleAtAsync<SendReminderJob, ReminderParams>( new(reservationId, email), scheduledFor: checkout.AddHours(-2));
// With correlationawait scheduler.ScheduleAsync<SendReminderJob, ReminderParams>( new(reservationId, email), delay: TimeSpan.FromHours(24), correlationId: $"reservation-{reservationId}");
// Cancelawait scheduler.CancelAsync(jobId);JobContext
Section titled “JobContext”Every job execution receives a JobContext with metadata about the current invocation.
public sealed record JobContext( Guid JobId, // Unique job instance ID string JobType, // FQN of the job class DateTimeOffset ScheduledAt, // When scheduled to run int Attempt, // Current attempt (0 = first) int MaxAttempts, // Max retry attempts configured string? CorrelationId, // Optional correlation for tracing string? TenantId); // Optional tenant for multi-tenancypublic async Task ExecuteAsync(JobContext context, CancellationToken ct){ _logger.LogInformation( "Job {JobId} attempt {Attempt}/{MaxAttempts} for {Date}", context.JobId, context.Attempt, context.MaxAttempts, context.ScheduledAt.Date);
// Use CorrelationId to link with related operations using var activity = ActivitySource.StartActivity("GenerateReport"); activity?.SetTag("correlationId", context.CorrelationId);}Builder API
Section titled “Builder API”PragmaticApp Configuration
Section titled “PragmaticApp Configuration”await PragmaticApp.RunAsync(args, app =>{ app.UseJobs(jobs => { jobs.WithWorkerCount(4); // Concurrent job tasks jobs.WithPollingInterval(5); // Seconds between polls jobs.WithLeaseTime(300); // Lease duration (5 min) jobs.WithMaxRetries(3); // Default retry for jobs without [Retry] jobs.WithBatchSize(10); // Pending jobs per poll jobs.WithWorkerId("worker-1"); // Custom worker ID jobs.UseEfCore(); // Production persistence });});Standalone (without Composition)
Section titled “Standalone (without Composition)”var builder = WebApplication.CreateBuilder(args);
builder.Services.AddPragmaticJobs(jobs =>{ jobs.WithWorkerCount(2); jobs.WithPollingInterval(10);});
// Register processing services (Composition does this automatically)builder.Services.AddJobProcessingServices();
var app = builder.Build();app.Run();Configuration Options
Section titled “Configuration Options”| Option | Default | Description |
|---|---|---|
WorkerCount | 2 | Number of concurrent job processing tasks |
PollingIntervalSeconds | 5 | Seconds between polling for pending jobs |
LeaseTimeSeconds | 300 | Lease duration for distributed locking |
BatchSize | 10 | Maximum pending jobs to fetch per poll cycle |
DefaultMaxRetries | 1 | Default max retries for jobs without [Retry] |
WorkerId | auto-generated | Worker ID for lease acquisition |
UseEfCore | false | Enable EF Core persistence |
EF Core Persistence
Section titled “EF Core Persistence”In production, use EF Core for durable job storage:
dotnet add package Pragmatic.Jobs.EFCoreapp.UseJobs(jobs => jobs.UseEfCore());This replaces the default in-memory stores with:
EfCoreJobStore(implementsIJobStore)EfCoreRecurringJobStore(implementsIRecurringJobStore)
Database Tables
Section titled “Database Tables”| Table | Purpose |
|---|---|
__Jobs | Job instances (pending, running, completed, failed) |
__RecurringJobs | Recurring job definitions and next-run tracking |
Distributed Locking
Section titled “Distributed Locking”Multiple app instances can process jobs safely. The lease-based locking uses optimistic SQL:
UPDATE __JobsSET LeasedBy = @worker, LeaseExpiresAt = @leaseWHERE Id = @id AND (LeasedBy IS NULL OR LeaseExpiresAt < @now)Only one worker acquires each job. If a worker crashes, the lease expires and another worker picks up the job.
Host Integration
Section titled “Host Integration”With Pragmatic.Composition, the SG auto-detects Pragmatic.Jobs via FeatureDetector and generates host registration:
// PragmaticHost.g.cs (auto-generated)services.AddPragmaticJobs();services.AddJobProcessingServices();No manual registration needed. The SG also generates:
- Job type registry (AOT-safe
switchexpression, noType.GetType) - Recurring job definitions with cron expressions
- DI registration for all discovered jobs
Background Services
Section titled “Background Services”Two hosted services drive the job infrastructure:
| Service | Responsibility |
|---|---|
JobProcessorService | Polls IJobStore for pending jobs, acquires leases, invokes jobs |
RecurringJobSchedulerService | Evaluates cron expressions, enqueues recurring jobs when due |
Both are registered automatically by AddJobProcessingServices() (or by the SG in Composition mode).
Cross-Module: Messaging.Jobs Bridge
Section titled “Cross-Module: Messaging.Jobs Bridge”Pragmatic.Messaging.Jobs bridges the two systems, enabling scheduled message delivery:
app.UseMessaging(msg =>{ msg.EnableScheduledMessages(); // Requires Pragmatic.Messaging.Jobs});This registers IMessageScheduler backed by IJobScheduler:
// Schedule a message for future deliveryvar scheduleId = await messageScheduler.ScheduleAsync( new CheckoutReminder(reservationId), delay: TimeSpan.FromHours(24));
// Or at a specific timeawait messageScheduler.ScheduleAsync( new CheckoutReminder(reservationId), scheduledAt: checkout.AddHours(-2));
// Cancelawait messageScheduler.CancelAsync(scheduleId);Internally, this creates a PublishMessageJob that publishes the message to the bus when the scheduled time arrives.
Observability
Section titled “Observability”OpenTelemetry
Section titled “OpenTelemetry”ActivitySource: Pragmatic.Jobs
Every job execution creates a span with tags for job type, attempt number, correlation ID, and tenant.
Metrics
Section titled “Metrics”| Metric | Type | Description |
|---|---|---|
pragmatic.jobs.enqueued | Counter | Total jobs enqueued |
pragmatic.jobs.completed | Counter | Total jobs completed successfully |
pragmatic.jobs.failed | Counter | Total jobs that failed permanently |
pragmatic.jobs.retried | Counter | Total retry attempts |
pragmatic.jobs.duration | Histogram (ms) | Job execution duration |
pragmatic.jobs.lease_acquisitions | Counter | Total lease acquisitions |
pragmatic.jobs.lease_conflicts | Counter | Lease conflicts (another worker) |
pragmatic.jobs.recurring_triggered | Counter | Recurring job triggers |
Diagnostics
Section titled “Diagnostics”The source generator emits diagnostics for common mistakes:
| ID | Severity | Message |
|---|---|---|
| PRAG2500 | Error | Job class must implement IJob or IJob<T> |
| PRAG2501 | Error | Invalid or empty cron expression |
| PRAG2502 | Warning | Job class should be partial |
| PRAG2503 | Error | Duplicate recurring job ID |
| PRAG2504 | Error | [Retry] with MaxAttempts <= 0 |
| PRAG2505 | Error | [Continuation<T>] where T does not implement IJob |
| PRAG2506 | Error | Continuation cycle detected (A->B->A) |
Persistence Stores
Section titled “Persistence Stores”| Store | Package | Use Case |
|---|---|---|
InMemoryJobStore | Pragmatic.Jobs | Development and testing (default) |
InMemoryRecurringJobStore | Pragmatic.Jobs | Development and testing (default) |
EfCoreJobStore | Pragmatic.Jobs.EFCore | Production with database persistence |
EfCoreRecurringJobStore | Pragmatic.Jobs.EFCore | Production with database persistence |
Packages
Section titled “Packages”| Package | Description |
|---|---|
Pragmatic.Jobs | Core: IJob, IJob<T>, IJobScheduler, InMemory stores, background services, attributes |
Pragmatic.Jobs.EFCore | EF Core persistence: EfCoreJobStore, EfCoreRecurringJobStore, entity config |
SG-Generated Output
Section titled “SG-Generated Output”For a project with two jobs, the SG produces:
obj/GeneratedFiles/Pragmatic.SourceGenerator/├── DailyReportJob.Invoker.g.cs # Retry + timeout + telemetry├── SendReminderJob.Invoker.g.cs # Retry + telemetry├── _Infra.Jobs.Registration.g.cs # DI registration for all jobs├── _Infra.Jobs.TypeRegistry.g.cs # AOT-safe type name → factory├── _Infra.Jobs.RecurringJobs.g.cs # Recurring job definitions└── _Metadata.Jobs.g.cs # Job metadata for toolingThe invoker wraps the original ExecuteAsync with the configured resilience pipeline:
// Simplified illustration of generated DailyReportJob.Invoker.g.cspartial class DailyReportJob{ internal sealed class Invoker { public async Task InvokeAsync(JobContext context, CancellationToken ct) { // Timeout using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); cts.CancelAfter(TimeSpan.FromSeconds(600));
// Retry loop for (var attempt = 0; attempt <= 3; attempt++) { try { using var activity = JobsDiagnostics.ActivitySource.StartActivity("DailyReportJob"); var sw = Stopwatch.StartNew();
await job.ExecuteAsync(context with { Attempt = attempt }, cts.Token);
JobsDiagnostics.JobDuration.Record(sw.ElapsedMilliseconds); JobsDiagnostics.JobsCompleted.Add(1); return; } catch (Exception) when (attempt < 3) { JobsDiagnostics.JobsRetried.Add(1); var delay = CalculateDelay(attempt, 1000, BackoffStrategy.ExponentialWithJitter); await Task.Delay(delay, ct); } } } }}Anti-Patterns
Section titled “Anti-Patterns”| Avoid | Prefer |
|---|---|
ISourceGenerator | IIncrementalGenerator (SG handles this) |
| Reflection-based job discovery | [Job] / [RecurringJob] attributes |
| Global retry filters | [Retry] per-job |
| External scheduler configuration | Attributes on the job class |
| Manual DI registration | SG-generated registration |
Type.GetType() for deserialization | SG-generated IJobTypeRegistry |
Complete Example
Section titled “Complete Example”[RecurringJob("0 2 * * *", Id = "daily-report")][Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter)][Timeout(TimeoutSeconds = 600)]public sealed partial class DailyReportJob( IReportService reports, ILogger<DailyReportJob> logger) : IJob{ public async Task ExecuteAsync(JobContext context, CancellationToken ct) { logger.LogInformation("Generating report for {Date}", context.ScheduledAt.Date); await reports.GenerateDailyAsync(context.ScheduledAt.Date, ct); }}
// Jobs/GenerateInvoiceJob.cspublic record InvoiceParams(Guid ReservationId, decimal Amount, string Currency);
[Job][Retry(MaxAttempts = 3)][Continuation<SendInvoiceEmailJob>]public sealed partial class GenerateInvoiceJob(IBillingService billing) : IJob<InvoiceParams>{ public async Task ExecuteAsync(InvoiceParams p, JobContext context, CancellationToken ct) => await billing.GenerateAsync(p.ReservationId, p.Amount, p.Currency, ct);}
[Job][Retry(MaxAttempts = 2)]public sealed partial class SendInvoiceEmailJob(IEmailService email) : IJob{ public async Task ExecuteAsync(JobContext context, CancellationToken ct) => await email.SendInvoiceAsync(context.CorrelationId!, ct);}
// Program.csawait PragmaticApp.RunAsync(args, app =>{ app.UseJobs(jobs => { jobs.WithWorkerCount(4); jobs.WithPollingInterval(5); jobs.UseEfCore(); });});