Skip to content

Architecture and Core Concepts

This guide explains why Pragmatic.Jobs exists, how its pieces fit together, and how to choose the right abstraction for each situation. Read this before diving into the individual feature guides.


Background job processing in .NET typically means pulling in Hangfire or Quartz.NET. Both are capable, but both rely on runtime reflection for job discovery, serialization, and invocation. They require separate dashboard packages, separate storage providers, and separate retry configuration that lives outside your job definition.

The Hangfire approach: config is separate from the job

Section titled “The Hangfire approach: config is separate from the job”
// Job class -- knows nothing about how it will be scheduled
public class DailyReportJob
{
private readonly IReportService _reports;
public DailyReportJob(IReportService reports) => _reports = reports;
public async Task Execute()
=> await _reports.GenerateDailyAsync(DateTime.UtcNow.Date);
}
// Somewhere else -- scheduling config is disconnected from the job
RecurringJob.AddOrUpdate<DailyReportJob>(
"daily-report",
x => x.Execute(), // Expression tree -> reflection at runtime
"0 2 * * *",
new RecurringJobOptions { TimeZone = TimeZoneInfo.FindSystemTimeZoneById("Europe/Rome") });
// Yet another place -- retry is global or per-filter
GlobalJobFilters.Filters.Add(new AutomaticRetryAttribute { Attempts = 3 });

Problems with this approach:

  1. Configuration is scattered. The job class, the schedule registration, and the retry policy all live in different places. To understand what a job does, you need to look in three locations.

  2. Reflection-heavy. RecurringJob.AddOrUpdate uses expression trees that are evaluated at runtime. Type.GetType() is used for deserialization. Neither is AOT-safe.

  3. No compile-time validation. If you typo a cron expression, rename a job class, or create a cycle between continuations, you find out at runtime — or not at all.

  4. Retry logic is external. Different jobs need different retry strategies, but configuring per-job retry requires filter attributes evaluated at runtime.

  5. No distributed locking out of the box. You need additional configuration for multi-instance deployments.


With Pragmatic.Jobs, you declare scheduling, retry, and timeout directly on the job class. The source generator reads these attributes at compile time and produces an invoker with inline retry loops, linked cancellation tokens, telemetry, and DI wiring — zero reflection.

[RecurringJob("0 2 * * *", Id = "daily-report")]
[Retry(MaxAttempts = 3, Strategy = BackoffStrategy.ExponentialWithJitter, BaseDelayMs = 1000)]
[Timeout(TimeoutSeconds = 600)]
public sealed partial class DailyReportJob(IReportService reports) : IJob
{
public async Task ExecuteAsync(JobContext context, CancellationToken ct)
=> await reports.GenerateDailyAsync(context.ScheduledAt.Date, ct);
}

That is the entire job definition. Everything a developer needs to know about how this job is scheduled, retried, and timed out is right here in the class declaration.

The source generator produces:

Generated FilePurpose
DailyReportJob.Invoker.g.csRetry loop + timeout + telemetry wrapper
_Infra.Jobs.Registration.g.csDI registration for all discovered jobs
_Infra.Jobs.TypeRegistry.g.csAOT-safe switch expression (no Type.GetType)
_Infra.Jobs.RecurringJobs.g.csRecurring job definitions with cron expressions
_Metadata.Jobs.g.csJob metadata for host aggregation
  • Single source of truth. Schedule, retry, timeout, and continuation are all declared on the job class.
  • Compile-time validation. Invalid cron expressions (PRAG2501), duplicate recurring IDs (PRAG2503), continuation cycles (PRAG2506) — all caught at build time.
  • AOT-safe. The generated IJobTypeRegistry uses a switch expression on string FQNs. No Type.GetType(), no reflection.
  • Distributed-safe. Lease-based locking works out of the box with EF Core persistence.
  • Observable. Every execution creates an OpenTelemetry span with job type, attempt, and correlation ID. Metrics for enqueued, completed, failed, retried, and duration are built in.

The job processing pipeline has four stages: Definition, Generation, Scheduling, and Execution.

You write a class that implements IJob (parameterless) or IJob<T> (typed parameters) and decorate it with [Job] or [RecurringJob]:

[Job]
[Retry(MaxAttempts = 3)]
public sealed partial class SendReminderJob : IJob<ReminderParams>
{
public async Task ExecuteAsync(ReminderParams p, JobContext context, CancellationToken ct)
{
// Your business logic
}
}

The source generator (JobsFeature in Pragmatic.SourceGenerator) processes every class with [Job] or [RecurringJob]:

  1. Transform: JobTransform extracts attribute data into an immutable JobModel record.
  2. Validate: JobsFeature.ValidateJobs runs aggregate diagnostics (duplicate IDs, continuation cycles).
  3. Template: JobInvokerTemplate generates the per-job invoker. JobRegistrationTemplate, JobTypeRegistryTemplate, RecurringJobRegistrationTemplate, and JobMetadataTemplate generate infrastructure files.

Jobs enter the system in two ways:

  • Recurring: RecurringJobSchedulerService polls IRecurringJobStore every N seconds. When a recurring definition is due (based on cron evaluation in the configured timezone), it enqueues a JobInstance into IJobStore.
  • On-demand: Your code calls IJobScheduler.ScheduleAsync<TJob>(...) which creates a JobInstance with the appropriate ScheduledFor timestamp.

JobProcessorService is a BackgroundService that:

  1. Releases expired leases from crashed workers.
  2. Polls IJobStore.GetPendingAsync() for jobs due now.
  3. Acquires a lease (optimistic lock) on each job.
  4. Resolves the job from DI via IJobTypeRegistry.ExecuteAsync().
  5. Executes the generated invoker (which wraps your ExecuteAsync with retry, timeout, and telemetry).
  6. Marks the job as completed or failed.
  7. If completed and a continuation is declared, enqueues the next job.
[RecurringJob] ──> SG ──> RecurringJobSchedulerService ──> IJobStore
[Job] ──> IJobScheduler.ScheduleAsync() ──────────────────────┘
v
JobProcessorService
┌───────────────┼───────────────┐
v v v
Worker 1 Worker 2 Worker N
│ │ │
TryAcquireLease TryAcquireLease TryAcquireLease
│ │ │
IJobTypeRegistry.ExecuteAsync()
Generated Invoker
(retry + timeout + telemetry)
Your ExecuteAsync()

ScenarioInterfaceExample
No input data neededIJobDaily report, session cleanup, health check
Needs input dataIJob<T>Send email to specific guest, generate invoice for reservation
// Parameterless -- used for recurring tasks that derive context from the schedule
public sealed partial class CleanupExpiredSessionsJob : IJob
{
public async Task ExecuteAsync(JobContext context, CancellationToken ct)
{
var cutoff = context.ScheduledAt.AddDays(-30);
// ...
}
}
// Typed parameters -- used for on-demand jobs that need specific input
public record ReminderParams(Guid ReservationId, Guid GuestId, string GuestEmail);
public sealed partial class SendCheckInReminderJob : IJob<ReminderParams>
{
public async Task ExecuteAsync(ReminderParams p, JobContext context, CancellationToken ct)
{
// Send email to p.GuestEmail
}
}
ScenarioAttributeScheduling
Fixed schedule (cron)[RecurringJob("0 2 * * *")]Automatic via RecurringJobSchedulerService
Fire-and-forget (now)[Job]scheduler.ScheduleAsync<T>()
Delayed (in N hours)[Job]scheduler.ScheduleAsync<T>(delay: TimeSpan.FromHours(24))
At specific time[Job]scheduler.ScheduleAtAsync<T>(scheduledFor: ...)

Rule of thumb: If the job runs on a calendar-based schedule, use [RecurringJob]. If the job is triggered by a business event (reservation confirmed, payment received), use [Job] and schedule it programmatically.

Continuations are for sequential job chains where the second job should only run if the first succeeds:

[Job]
[Continuation<SendInvoiceEmailJob>]
public sealed partial class GenerateInvoiceJob : IJob<InvoiceParams>
{
public async Task ExecuteAsync(InvoiceParams p, JobContext context, CancellationToken ct)
{
// Generate invoice -- if this fails, the email is NOT sent
}
}
[Job]
[Retry(MaxAttempts = 2)]
public sealed partial class SendInvoiceEmailJob : IJob
{
public async Task ExecuteAsync(JobContext context, CancellationToken ct)
{
// Send email using context.CorrelationId to find the invoice
}
}

The SG validates continuation chains at compile time:

  • PRAG2505: Continuation type must implement IJob or IJob<T>.
  • PRAG2506: No cycles allowed (A->B->A or A->B->C->A are compile errors).

Every job execution receives an immutable JobContext record with metadata about the current invocation:

public sealed record JobContext(
Guid JobId, // Unique job instance ID
string JobType, // FQN of the job class
DateTimeOffset ScheduledAt, // When the job was scheduled to run
int Attempt, // Current attempt number (0 = first)
int MaxAttempts, // Max retry attempts configured
string? CorrelationId, // Optional correlation for tracing
string? TenantId); // Optional tenant for multi-tenancy

Use JobContext for:

  • Logging: Include JobId, Attempt, and ScheduledAt in log messages.
  • Correlation: Use CorrelationId to link related operations across job chains.
  • Idempotency: Use JobId to ensure at-most-once execution in external systems.
  • Multi-tenancy: Use TenantId to scope database queries.

When using Pragmatic.Composition, the SG auto-detects Pragmatic.Jobs via FeatureDetector and generates host registration in PragmaticHost.g.cs:

// Auto-generated -- no manual registration needed
services.AddPragmaticJobs();
services.AddJobProcessingServices();

Configure options via UseJobs():

await PragmaticApp.RunAsync(args, app =>
{
app.UseJobs(jobs =>
{
jobs.WithWorkerCount(4);
jobs.WithPollingInterval(5);
jobs.UseEfCore();
});
});
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddPragmaticJobs(jobs =>
{
jobs.WithWorkerCount(2);
jobs.WithPollingInterval(10);
});
// Register processing services manually
builder.Services.AddJobProcessingServices();
OptionDefaultDescription
WorkerCount2Number of concurrent job processing tasks
PollingIntervalSeconds5Seconds between polling for pending jobs
LeaseTimeSeconds300Lease duration for distributed locking (5 min)
BatchSize10Maximum pending jobs to fetch per poll cycle
DefaultMaxRetries1Default max retries for jobs without [Retry]
WorkerIdauto-generatedWorker ID for lease acquisition
UseEfCorefalseEnable EF Core persistence

For a project with two jobs — a recurring DailyReportJob and an on-demand SendReminderJob — the SG produces five files:

DailyReportJob.Invoker.g.cs wraps your ExecuteAsync with the configured resilience pipeline:

// Simplified illustration
partial class DailyReportJob
{
internal sealed class Invoker
{
public async Task InvokeAsync(JobContext context, CancellationToken ct)
{
// Timeout: linked CancellationToken
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(TimeSpan.FromSeconds(600));
// Retry loop (inline, no Polly dependency)
for (var attempt = 0; attempt <= 3; attempt++)
{
try
{
using var activity = JobsDiagnostics.ActivitySource.StartActivity("DailyReportJob");
var sw = Stopwatch.StartNew();
await job.ExecuteAsync(context with { Attempt = attempt }, cts.Token);
JobsDiagnostics.JobDuration.Record(sw.ElapsedMilliseconds);
JobsDiagnostics.JobsCompleted.Add(1);
return;
}
catch (Exception) when (attempt < 3)
{
JobsDiagnostics.JobsRetried.Add(1);
var delay = CalculateDelay(attempt, 1000, BackoffStrategy.ExponentialWithJitter);
await Task.Delay(delay, ct);
}
}
}
}
}

Key details:

  • The retry loop is fully inlined — no Polly, no runtime reflection.
  • [Timeout] generates a linked CancellationTokenSource with CancelAfter.
  • [Retry] generates the loop with the configured BackoffStrategy and BaseDelayMs.
  • OpenTelemetry spans and metrics are always emitted.

_Infra.Jobs.Registration.g.cs registers all discovered jobs and their invokers in DI:

public static IServiceCollection AddPragmaticJobs(this IServiceCollection services)
{
services.AddTransient<DailyReportJob>();
services.AddTransient<DailyReportJob.Invoker>();
services.AddTransient<SendReminderJob>();
services.AddTransient<SendReminderJob.Invoker>();
return services;
}

_Infra.Jobs.TypeRegistry.g.cs maps job FQN strings to factories — an AOT-safe switch expression that replaces Type.GetType():

public sealed class PragmaticJobTypeRegistry : IJobTypeRegistry
{
public async Task ExecuteAsync(string jobTypeFqn, string? parametersJson,
JobContext context, IServiceProvider sp, CancellationToken ct)
{
switch (jobTypeFqn)
{
case "MyApp.Jobs.DailyReportJob":
var job1 = sp.GetRequiredService<DailyReportJob.Invoker>();
await job1.InvokeAsync(context, ct);
break;
case "MyApp.Jobs.SendReminderJob":
var job2 = sp.GetRequiredService<SendReminderJob.Invoker>();
await job2.InvokeAsync(parametersJson, context, ct);
break;
default:
throw new InvalidOperationException($"Unknown job: {jobTypeFqn}");
}
}
}

_Infra.Jobs.RecurringJobs.g.cs registers recurring definitions at startup:

public static void RegisterRecurringJobs(IRecurringJobStore store)
{
store.UpsertAsync(new RecurringJobDefinition
{
Id = "daily-report",
JobType = "MyApp.Jobs.DailyReportJob",
CronExpression = "0 2 * * *",
TimeZoneId = null, // UTC
IsEnabled = true
});
}

_Metadata.Jobs.g.cs exposes job metadata for host aggregation and tooling.


Marks a class as a background job. Required for SG discovery on non-recurring jobs.

[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public sealed class JobAttribute : Attribute;

Marks a class as a recurring job with cron scheduling.

PropertyTypeDefaultDescription
CronExpressionstring(required)5-part cron expression
Idstring?auto-generated (kebab-case)Unique recurring job ID
TimeZonestring?UTCIANA timezone for cron evaluation

Configures retry behavior. The SG generates an inline retry loop in the invoker.

PropertyTypeDefaultDescription
MaxAttemptsint3Maximum retry attempts
StrategyBackoffStrategyExponentialFixed, Exponential, ExponentialWithJitter
BaseDelayMsint1000Base delay between retries (ms)

Backoff formulas:

StrategyFormula
FixedbaseDelayMs
ExponentialbaseDelayMs * 2^attempt
ExponentialWithJitterbaseDelayMs * 2^attempt + Random(0, baseDelayMs)

Sets an execution timeout per attempt.

PropertyTypeDefaultDescription
TimeoutSecondsint300Timeout per execution attempt (seconds)

Declares automatic job chaining on successful completion. The next job is enqueued with the same CorrelationId.

[Continuation<SendInvoiceEmailJob>]
public sealed partial class GenerateInvoiceJob : IJob<InvoiceParams> { ... }

StorePackageUse Case
InMemoryJobStorePragmatic.JobsDevelopment and testing (default)
InMemoryRecurringJobStorePragmatic.JobsDevelopment and testing (default)
EfCoreJobStorePragmatic.Jobs.EFCoreProduction with database persistence
EfCoreRecurringJobStorePragmatic.Jobs.EFCoreProduction with database persistence

Multiple app instances can process jobs safely. The lease-based locking uses optimistic SQL:

UPDATE __Jobs
SET LeasedBy = @worker, LeaseExpiresAt = @lease
WHERE Id = @id
AND (LeasedBy IS NULL OR LeaseExpiresAt < @now)

Only one worker acquires each job. If a worker crashes, the lease expires after LeaseTimeSeconds and another worker picks up the job.

TablePurpose
__JobsJob instances (pending, running, completed, failed, cancelled)
__RecurringJobsRecurring job definitions and next-run tracking

Two hosted services drive the job infrastructure:

ServiceResponsibility
JobProcessorServicePolls IJobStore for pending jobs, acquires leases, invokes via IJobTypeRegistry
RecurringJobSchedulerServiceEvaluates cron expressions against IRecurringJobStore, enqueues when due

Both are registered by AddJobProcessingServices() (or automatically by the SG in Composition mode). Both skip execution if IJobTypeRegistry is NullJobTypeRegistry (meaning no jobs were discovered).


ActivitySource: Pragmatic.Jobs

Every job execution creates a span with tags: job.id, job.type, job.attempt.

MetricTypeDescription
pragmatic.jobs.enqueuedCounterTotal jobs enqueued
pragmatic.jobs.completedCounterTotal jobs completed successfully
pragmatic.jobs.failedCounterTotal jobs that failed permanently
pragmatic.jobs.retriedCounterTotal retry attempts
pragmatic.jobs.durationHistogram (ms)Job execution duration
pragmatic.jobs.lease_acquisitionsCounterTotal lease acquisitions
pragmatic.jobs.lease_conflictsCounterLease conflicts (another worker won)
pragmatic.jobs.recurring_triggeredCounterRecurring job triggers

Pragmatic.Messaging.Jobs bridges the two systems, enabling scheduled message delivery:

app.UseMessaging(msg =>
{
msg.EnableScheduledMessages(); // Requires Pragmatic.Messaging.Jobs
});
// Schedule a message for future delivery
var scheduleId = await messageScheduler.ScheduleAsync(
new CheckoutReminder(reservationId),
delay: TimeSpan.FromHours(24));

Internally, this creates a PublishMessageJob that publishes the message to the bus when the scheduled time arrives.

The Showcase application demonstrates both job types:

  • NoShowDetectionJob (examples/showcase/src/Showcase.Booking/Infrastructure/Jobs/) — recurring hourly job that detects no-show reservations.
  • SendCheckInReminderJob (examples/showcase/src/Showcase.Booking/Infrastructure/Jobs/) — delayed job with typed parameters, scheduled 24h before check-in.

PackageDescription
Pragmatic.JobsCore: IJob, IJob<T>, IJobScheduler, InMemory stores, background services, attributes
Pragmatic.Jobs.EFCoreEF Core persistence: EfCoreJobStore, EfCoreRecurringJobStore, entity config