Skip to content

Pragmatic.Jobs

AOT-safe background job scheduling for .NET 10. Recurring cron jobs, delayed fire-and-forget, continuation chains, distributed locking — all source-generated at compile time.

Background job processing in .NET typically means pulling in Hangfire or Quartz.NET. Both are capable, but both rely on runtime reflection for job discovery, serialization, and invocation. They require separate dashboard packages, separate storage providers, and separate retry configuration that lives outside your job definition. When a job needs retry with exponential backoff, you configure it in a fluent API on one side and write the job logic on the other — the two are disconnected.

// Without Pragmatic: Hangfire-style — config is separate from the job
RecurringJob.AddOrUpdate<DailyReportJob>(
"daily-report",
x => x.Execute(), // Expression tree → reflection
"0 2 * * *",
new RecurringJobOptions { TimeZone = TimeZoneInfo.FindSystemTimeZoneById("Europe/Rome") });
// Retry is configured globally or per-filter, not on the job itself
GlobalJobFilters.Filters.Add(new AutomaticRetryAttribute { Attempts = 3 });

The job class has no idea how it will be scheduled, retried, or timed out. If you want different retry strategies for different jobs, you need per-job filter attributes that are evaluated at runtime.

With Pragmatic.Jobs, you declare scheduling, retry, and timeout directly on the job class. The source generator produces the invoker with inline retry loops, linked cancellation tokens, telemetry, and DI wiring — all at compile time, zero reflection.

[RecurringJob("0 2 * * *", Id = "daily-report")]
[Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 1000)]
[Timeout(TimeoutSeconds = 600)]
public sealed partial class DailyReportJob(IReportService reports) : IJob
{
public async Task ExecuteAsync(JobContext context, CancellationToken ct)
=> await reports.GenerateDailyAsync(context.ScheduledAt.Date, ct);
}

That is the entire job definition. The SG generates:

  • DailyReportJob.Invoker.g.cs with retry loop + timeout + telemetry
  • Registration in _Infra.Jobs.Registration.g.cs
  • Type entry in _Infra.Jobs.TypeRegistry.g.cs (AOT-safe, no Type.GetType)
  • Recurring definition in _Infra.Jobs.RecurringJobs.g.cs
  • Metadata in _Metadata.Jobs.g.cs
Terminal window
dotnet add package Pragmatic.Jobs
dotnet add package Pragmatic.Jobs.EFCore # For production persistence

Add the source generator:

<ProjectReference Include="..\Pragmatic.SourceGenerator\src\Pragmatic.SourceGenerator\Pragmatic.SourceGenerator.csproj"
OutputItemType="Analyzer"
ReferenceOutputAssembly="false" />

using Pragmatic.Jobs;
using Pragmatic.Jobs.Attributes;
[RecurringJob("0 2 * * *", Id = "daily-report")]
[Retry(MaxAttempts = 3)]
public sealed partial class DailyReportJob(IReportService reports) : IJob
{
public async Task ExecuteAsync(JobContext context, CancellationToken ct)
=> await reports.GenerateDailyAsync(context.ScheduledAt.Date, ct);
}
await PragmaticApp.RunAsync(args, app =>
{
app.UseJobs(jobs =>
{
jobs.WithWorkerCount(4);
jobs.WithPollingInterval(5);
jobs.UseEfCore();
});
});
var jobId = await scheduler.ScheduleAsync<SendReminderJob, ReminderParams>(
new ReminderParams(reservationId, "guest@hotel.com"),
delay: TimeSpan.FromHours(24));

The SG auto-discovers jobs by [Job] and [RecurringJob] attributes. No manual registration needed.


For jobs that need no input parameters. Recurring cleanup, report generation, health checks.

public interface IJob
{
Task ExecuteAsync(JobContext context, CancellationToken ct);
}
[RecurringJob("0 0 * * 0", Id = "cleanup-sessions", TimeZone = "Europe/Rome")]
public sealed partial class CleanupExpiredSessionsJob(AppDbContext db) : IJob
{
public async Task ExecuteAsync(JobContext context, CancellationToken ct)
{
var cutoff = context.ScheduledAt.AddDays(-30);
await db.Sessions.Where(s => s.ExpiresAt < cutoff).ExecuteDeleteAsync(ct);
}
}

For jobs that require input data. The parameter type must be JSON-serializable.

public interface IJob<in TParams> where TParams : notnull
{
Task ExecuteAsync(TParams parameters, JobContext context, CancellationToken ct);
}
public record InvoiceParams(Guid ReservationId, Guid GuestId, decimal Amount, string Currency);
[Job]
[Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 2000)]
[Timeout(TimeoutSeconds = 120)]
public sealed partial class GenerateInvoiceJob(IBillingService billing) : IJob<InvoiceParams>
{
public async Task ExecuteAsync(InvoiceParams p, JobContext context, CancellationToken ct)
=> await billing.GenerateAsync(p.ReservationId, p.Amount, p.Currency, ct);
}

Recurring jobs use standard 5-part cron expressions and run on a fixed schedule.

[RecurringJob("0 2 * * *", Id = "daily-report")]
// │ │ │ │ │
// │ │ │ │ └── Day of week (0-6, Sun=0)
// │ │ │ └──── Month (1-12)
// │ │ └────── Day of month (1-31)
// │ └──────── Hour (0-23)
// └────────── Minute (0-59)
ExpressionSchedule
0 2 * * *Daily at 2:00 AM
0 0 * * 0Every Sunday at midnight
*/15 * * * *Every 15 minutes
0 9 1 * *First day of every month at 9:00 AM
0 0 * * 1-5Every weekday at midnight

Timezone support: The TimeZone property accepts IANA timezone names.

[RecurringJob("0 0 * * 0", Id = "cleanup", TimeZone = "Europe/Rome")]
public sealed partial class WeeklyCleanupJob : IJob { ... }

Auto-generated ID: If Id is not specified, the SG generates a kebab-cased ID from the class name (DailyReportJob becomes daily-report).

Jobs decorated with [Job] (not [RecurringJob]) are scheduled programmatically.

[Job]
public sealed partial class SendReminderJob : IJob<ReminderParams> { ... }
// Fire-and-forget (immediate)
var id = await scheduler.ScheduleAsync<SendReminderJob, ReminderParams>(
new ReminderParams(reservationId, email));
// Delayed (runs in 24 hours)
var id = await scheduler.ScheduleAsync<SendReminderJob, ReminderParams>(
new ReminderParams(reservationId, email),
delay: TimeSpan.FromHours(24));
// At specific time
var id = await scheduler.ScheduleAtAsync<SendReminderJob, ReminderParams>(
new ReminderParams(reservationId, email),
scheduledFor: checkout.AddHours(-2));

Jobs can declare automatic continuation: when job A completes, job B is enqueued.

[Job]
[Continuation<SendInvoiceEmailJob>]
public sealed partial class GenerateInvoiceJob : IJob<InvoiceParams>
{
public async Task ExecuteAsync(InvoiceParams p, JobContext context, CancellationToken ct)
{
// Generate invoice...
}
// On success, SendInvoiceEmailJob is automatically enqueued
// with the same CorrelationId for tracing.
}
[Job]
[Retry(MaxAttempts = 2)]
public sealed partial class SendInvoiceEmailJob : IJob
{
public async Task ExecuteAsync(JobContext context, CancellationToken ct)
{
// Send email using context.CorrelationId to find the invoice
}
}

The SG validates the chain at compile time:

  • The continuation type must implement IJob or IJob<T> (PRAG2505)
  • No cycles allowed: A->B->A or A->B->C->A are compile errors (PRAG2506)
  • Chain depth is checked via DAG traversal

Marks a class as a background job. Required for SG discovery on non-recurring jobs.

[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public sealed class JobAttribute : Attribute;

Marks a class as a recurring job with cron scheduling.

[RecurringJob("0 2 * * *", Id = "daily-report", TimeZone = "Europe/Rome")]
PropertyTypeDefaultDescription
CronExpressionstring(required)5-part cron expression
Idstring?auto-generatedUnique recurring job ID
TimeZonestring?UTCIANA timezone for cron evaluation

Configures retry behavior. The SG generates an inline retry loop in the invoker.

[Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 1000)]
PropertyTypeDefaultDescription
MaxAttemptsint3Maximum retry attempts
StrategyBackoffStrategyExponentialFixed, Exponential, ExponentialWithJitter
BaseDelayMsint1000Base delay between retries (ms)

Backoff strategies:

StrategyFormula
FixedbaseDelayMs
ExponentialbaseDelayMs * 2^attempt
ExponentialWithJitterbaseDelayMs * 2^attempt + Random(0, baseDelayMs)

Sets an execution timeout. The SG generates a linked CancellationToken in the invoker.

[Timeout(TimeoutSeconds = 600)]
PropertyTypeDefaultDescription
TimeoutSecondsint300Timeout per execution attempt (seconds)

Declares automatic job chaining on successful completion.

[Continuation<SendInvoiceEmailJob>]

The next job is enqueued immediately after the current job completes. The CorrelationId is carried forward.


The IJobScheduler interface is the primary API for scheduling on-demand jobs.

public interface IJobScheduler
{
// Parameterless — immediate, delayed, or at specific time
Task<Guid> ScheduleAsync<TJob>(
TimeSpan? delay = null,
string? correlationId = null,
CancellationToken ct = default) where TJob : IJob;
Task<Guid> ScheduleAtAsync<TJob>(
DateTimeOffset scheduledFor,
string? correlationId = null,
CancellationToken ct = default) where TJob : IJob;
// With parameters
Task<Guid> ScheduleAsync<TJob, TParams>(
TParams parameters,
TimeSpan? delay = null,
string? correlationId = null,
CancellationToken ct = default)
where TJob : IJob<TParams>
where TParams : notnull;
Task<Guid> ScheduleAtAsync<TJob, TParams>(
TParams parameters,
DateTimeOffset scheduledFor,
string? correlationId = null,
CancellationToken ct = default)
where TJob : IJob<TParams>
where TParams : notnull;
// Cancel
Task CancelAsync(Guid jobId, CancellationToken ct = default);
}
// Fire-and-forget
await scheduler.ScheduleAsync<CleanupJob>();
// Delayed
await scheduler.ScheduleAsync<SendReminderJob, ReminderParams>(
new(reservationId, email),
delay: TimeSpan.FromHours(24));
// At specific time
await scheduler.ScheduleAtAsync<SendReminderJob, ReminderParams>(
new(reservationId, email),
scheduledFor: checkout.AddHours(-2));
// With correlation
await scheduler.ScheduleAsync<SendReminderJob, ReminderParams>(
new(reservationId, email),
delay: TimeSpan.FromHours(24),
correlationId: $"reservation-{reservationId}");
// Cancel
await scheduler.CancelAsync(jobId);

Every job execution receives a JobContext with metadata about the current invocation.

public sealed record JobContext(
Guid JobId, // Unique job instance ID
string JobType, // FQN of the job class
DateTimeOffset ScheduledAt, // When scheduled to run
int Attempt, // Current attempt (0 = first)
int MaxAttempts, // Max retry attempts configured
string? CorrelationId, // Optional correlation for tracing
string? TenantId); // Optional tenant for multi-tenancy
public async Task ExecuteAsync(JobContext context, CancellationToken ct)
{
_logger.LogInformation(
"Job {JobId} attempt {Attempt}/{MaxAttempts} for {Date}",
context.JobId, context.Attempt, context.MaxAttempts, context.ScheduledAt.Date);
// Use CorrelationId to link with related operations
using var activity = ActivitySource.StartActivity("GenerateReport");
activity?.SetTag("correlationId", context.CorrelationId);
}

await PragmaticApp.RunAsync(args, app =>
{
app.UseJobs(jobs =>
{
jobs.WithWorkerCount(4); // Concurrent job tasks
jobs.WithPollingInterval(5); // Seconds between polls
jobs.WithLeaseTime(300); // Lease duration (5 min)
jobs.WithMaxRetries(3); // Default retry for jobs without [Retry]
jobs.WithBatchSize(10); // Pending jobs per poll
jobs.WithWorkerId("worker-1"); // Custom worker ID
jobs.UseEfCore(); // Production persistence
});
});
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddPragmaticJobs(jobs =>
{
jobs.WithWorkerCount(2);
jobs.WithPollingInterval(10);
});
// Register processing services (Composition does this automatically)
builder.Services.AddJobProcessingServices();
var app = builder.Build();
app.Run();
OptionDefaultDescription
WorkerCount2Number of concurrent job processing tasks
PollingIntervalSeconds5Seconds between polling for pending jobs
LeaseTimeSeconds300Lease duration for distributed locking
BatchSize10Maximum pending jobs to fetch per poll cycle
DefaultMaxRetries1Default max retries for jobs without [Retry]
WorkerIdauto-generatedWorker ID for lease acquisition
UseEfCorefalseEnable EF Core persistence

In production, use EF Core for durable job storage:

dotnet add package Pragmatic.Jobs.EFCore
app.UseJobs(jobs => jobs.UseEfCore());

This replaces the default in-memory stores with:

  • EfCoreJobStore (implements IJobStore)
  • EfCoreRecurringJobStore (implements IRecurringJobStore)
TablePurpose
__JobsJob instances (pending, running, completed, failed)
__RecurringJobsRecurring job definitions and next-run tracking

Multiple app instances can process jobs safely. The lease-based locking uses optimistic SQL:

UPDATE __Jobs
SET LeasedBy = @worker, LeaseExpiresAt = @lease
WHERE Id = @id
AND (LeasedBy IS NULL OR LeaseExpiresAt < @now)

Only one worker acquires each job. If a worker crashes, the lease expires and another worker picks up the job.


With Pragmatic.Composition, the SG auto-detects Pragmatic.Jobs via FeatureDetector and generates host registration:

// PragmaticHost.g.cs (auto-generated)
services.AddPragmaticJobs();
services.AddJobProcessingServices();

No manual registration needed. The SG also generates:

  • Job type registry (AOT-safe switch expression, no Type.GetType)
  • Recurring job definitions with cron expressions
  • DI registration for all discovered jobs

Two hosted services drive the job infrastructure:

ServiceResponsibility
JobProcessorServicePolls IJobStore for pending jobs, acquires leases, invokes jobs
RecurringJobSchedulerServiceEvaluates cron expressions, enqueues recurring jobs when due

Both are registered automatically by AddJobProcessingServices() (or by the SG in Composition mode).


Pragmatic.Messaging.Jobs bridges the two systems, enabling scheduled message delivery:

app.UseMessaging(msg =>
{
msg.EnableScheduledMessages(); // Requires Pragmatic.Messaging.Jobs
});

This registers IMessageScheduler backed by IJobScheduler:

// Schedule a message for future delivery
var scheduleId = await messageScheduler.ScheduleAsync(
new CheckoutReminder(reservationId),
delay: TimeSpan.FromHours(24));
// Or at a specific time
await messageScheduler.ScheduleAsync(
new CheckoutReminder(reservationId),
scheduledAt: checkout.AddHours(-2));
// Cancel
await messageScheduler.CancelAsync(scheduleId);

Internally, this creates a PublishMessageJob that publishes the message to the bus when the scheduled time arrives.


ActivitySource: Pragmatic.Jobs

Every job execution creates a span with tags for job type, attempt number, correlation ID, and tenant.

MetricTypeDescription
pragmatic.jobs.enqueuedCounterTotal jobs enqueued
pragmatic.jobs.completedCounterTotal jobs completed successfully
pragmatic.jobs.failedCounterTotal jobs that failed permanently
pragmatic.jobs.retriedCounterTotal retry attempts
pragmatic.jobs.durationHistogram (ms)Job execution duration
pragmatic.jobs.lease_acquisitionsCounterTotal lease acquisitions
pragmatic.jobs.lease_conflictsCounterLease conflicts (another worker)
pragmatic.jobs.recurring_triggeredCounterRecurring job triggers

The source generator emits diagnostics for common mistakes:

IDSeverityMessage
PRAG2500ErrorJob class must implement IJob or IJob<T>
PRAG2501ErrorInvalid or empty cron expression
PRAG2502WarningJob class should be partial
PRAG2503ErrorDuplicate recurring job ID
PRAG2504Error[Retry] with MaxAttempts <= 0
PRAG2505Error[Continuation<T>] where T does not implement IJob
PRAG2506ErrorContinuation cycle detected (A->B->A)

StorePackageUse Case
InMemoryJobStorePragmatic.JobsDevelopment and testing (default)
InMemoryRecurringJobStorePragmatic.JobsDevelopment and testing (default)
EfCoreJobStorePragmatic.Jobs.EFCoreProduction with database persistence
EfCoreRecurringJobStorePragmatic.Jobs.EFCoreProduction with database persistence

PackageDescription
Pragmatic.JobsCore: IJob, IJob<T>, IJobScheduler, InMemory stores, background services, attributes
Pragmatic.Jobs.EFCoreEF Core persistence: EfCoreJobStore, EfCoreRecurringJobStore, entity config

For a project with two jobs, the SG produces:

obj/GeneratedFiles/Pragmatic.SourceGenerator/
├── DailyReportJob.Invoker.g.cs # Retry + timeout + telemetry
├── SendReminderJob.Invoker.g.cs # Retry + telemetry
├── _Infra.Jobs.Registration.g.cs # DI registration for all jobs
├── _Infra.Jobs.TypeRegistry.g.cs # AOT-safe type name → factory
├── _Infra.Jobs.RecurringJobs.g.cs # Recurring job definitions
└── _Metadata.Jobs.g.cs # Job metadata for tooling

The invoker wraps the original ExecuteAsync with the configured resilience pipeline:

// Simplified illustration of generated DailyReportJob.Invoker.g.cs
partial class DailyReportJob
{
internal sealed class Invoker
{
public async Task InvokeAsync(JobContext context, CancellationToken ct)
{
// Timeout
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(TimeSpan.FromSeconds(600));
// Retry loop
for (var attempt = 0; attempt <= 3; attempt++)
{
try
{
using var activity = JobsDiagnostics.ActivitySource.StartActivity("DailyReportJob");
var sw = Stopwatch.StartNew();
await job.ExecuteAsync(context with { Attempt = attempt }, cts.Token);
JobsDiagnostics.JobDuration.Record(sw.ElapsedMilliseconds);
JobsDiagnostics.JobsCompleted.Add(1);
return;
}
catch (Exception) when (attempt < 3)
{
JobsDiagnostics.JobsRetried.Add(1);
var delay = CalculateDelay(attempt, 1000, BackoffStrategy.ExponentialWithJitter);
await Task.Delay(delay, ct);
}
}
}
}
}

AvoidPrefer
ISourceGeneratorIIncrementalGenerator (SG handles this)
Reflection-based job discovery[Job] / [RecurringJob] attributes
Global retry filters[Retry] per-job
External scheduler configurationAttributes on the job class
Manual DI registrationSG-generated registration
Type.GetType() for deserializationSG-generated IJobTypeRegistry

Jobs/DailyReportJob.cs
[RecurringJob("0 2 * * *", Id = "daily-report")]
[Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter)]
[Timeout(TimeoutSeconds = 600)]
public sealed partial class DailyReportJob(
IReportService reports,
ILogger<DailyReportJob> logger) : IJob
{
public async Task ExecuteAsync(JobContext context, CancellationToken ct)
{
logger.LogInformation("Generating report for {Date}", context.ScheduledAt.Date);
await reports.GenerateDailyAsync(context.ScheduledAt.Date, ct);
}
}
// Jobs/GenerateInvoiceJob.cs
public record InvoiceParams(Guid ReservationId, decimal Amount, string Currency);
[Job]
[Retry(MaxAttempts = 3)]
[Continuation<SendInvoiceEmailJob>]
public sealed partial class GenerateInvoiceJob(IBillingService billing) : IJob<InvoiceParams>
{
public async Task ExecuteAsync(InvoiceParams p, JobContext context, CancellationToken ct)
=> await billing.GenerateAsync(p.ReservationId, p.Amount, p.Currency, ct);
}
[Job]
[Retry(MaxAttempts = 2)]
public sealed partial class SendInvoiceEmailJob(IEmailService email) : IJob
{
public async Task ExecuteAsync(JobContext context, CancellationToken ct)
=> await email.SendInvoiceAsync(context.CorrelationId!, ct);
}
// Program.cs
await PragmaticApp.RunAsync(args, app =>
{
app.UseJobs(jobs =>
{
jobs.WithWorkerCount(4);
jobs.WithPollingInterval(5);
jobs.UseEfCore();
});
});