Skip to content

API Reference

This section contains the API documentation for Jobify, automatically generated from the source code.

jobify

Core scheduling components for the jobify framework.

This module provides the basic classes for job scheduling and management. It exposes the main scheduler interface and job planning components that form the basis of the jobify asynchronous job scheduling system.

INJECT = object() module-attribute

Cron dataclass

Configuration for cron-based job scheduling.

Attributes:

Name Type Description
expression str

The crontab-formatted expression.

max_runs int

Maximum number of times the job can be triggered. Defaults to infinity.

max_failures int

Maximum number of consecutive failures before disabling the job. Defaults to 10.

misfire_policy MisfirePolicy | GracePolicy

Policy to handle missed job executions. Defaults to MisfirePolicy.ONCE.

start_date datetime | None

Optional datetime when the cron job becomes active.

args tuple[Any, ...]

Positional arguments to pass to the job.

kwargs dict[str, Any]

Keyword arguments to pass to the job.

Source code in src/jobify/_internal/configuration.py
@dataclass(slots=True, kw_only=True, order=True)
class Cron:
    """Configuration for cron-based job scheduling.

    Attributes:
        expression: The crontab-formatted expression.
        max_runs: Maximum number of times the job can be triggered.
            Defaults to infinity.
        max_failures: Maximum number of consecutive failures before disabling the job.
            Defaults to 10.
        misfire_policy: Policy to handle missed job executions.
            Defaults to MisfirePolicy.ONCE.
        start_date: Optional datetime when the cron job becomes active.
        args: Positional arguments to pass to the job.
        kwargs: Keyword arguments to pass to the job.

    """

    expression: str = field(kw_only=False)
    max_runs: int = INFINITY
    max_failures: int = 10
    misfire_policy: MisfirePolicy | GracePolicy = MisfirePolicy.ONCE
    start_date: datetime | None = None
    args: tuple[Any, ...] = ()
    kwargs: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        if self.max_failures < 1:
            msg = "max_cron_failures must be >= 1. Use 1 for 'stop on first error'."
            raise ValueError(msg)

CronContext

Bases: Generic[ReturnT]

Holds configuration and state for a cron-based job.

Attributes:

Name Type Description
cron

The cron configuration.

cron_parser

The parser used to calculate next run times.

failure_count

Number of consecutive failures.

job

The associated job instance.

offset

The base datetime from which the next run is calculated.

run_count

Number of times this job has been executed.

Source code in src/jobify/_internal/scheduler/job.py
class CronContext(Generic[ReturnT]):
    """Holds configuration and state for a cron-based job.

    Attributes:
        cron: The cron configuration.
        cron_parser: The parser used to calculate next run times.
        failure_count: Number of consecutive failures.
        job: The associated job instance.
        offset: The base datetime from which the next run is calculated.
        run_count: Number of times this job has been executed.

    """

    __slots__: tuple[str, ...] = (
        "cron",
        "cron_parser",
        "failure_count",
        "job",
        "offset",
        "run_count",
    )

    def __init__(  # noqa: PLR0913
        self,
        *,
        cron: Cron,
        cron_parser: CronParser,
        failure_count: int = 0,
        job: Job[ReturnT],
        offset: datetime,
        run_count: int,
    ) -> None:
        """Initialize the CronContext.

        Args:
            cron: The cron configuration.
            cron_parser: The parser used to calculate next run times.
            failure_count: Initial failure count.
            job: The associated job instance.
            offset: The base datetime.
            run_count: Initial run count.

        """
        self.cron = cron
        self.cron_parser = cron_parser
        self.failure_count = failure_count
        self.job = job
        self.offset = offset
        self.run_count = run_count

    def is_run_exceeded_by_limit(self) -> bool:
        """Check if the maximum number of runs has been exceeded.

        Returns:
            True if the run limit is reached, False otherwise.

        """
        if self.cron.max_runs == INFINITY:
            return False
        return self.run_count >= self.cron.max_runs

    def is_failure_allowed_by_limit(self) -> bool:
        """Check if the job can still fail based on the allowed limit.

        Returns:
            True if the failure limit has not been reached, False otherwise.

        """
        return self.failure_count < self.cron.max_failures

__init__(*, cron, cron_parser, failure_count=0, job, offset, run_count)

Initialize the CronContext.

Parameters:

Name Type Description Default
cron Cron

The cron configuration.

required
cron_parser CronParser

The parser used to calculate next run times.

required
failure_count int

Initial failure count.

0
job Job[ReturnT]

The associated job instance.

required
offset datetime

The base datetime.

required
run_count int

Initial run count.

required
Source code in src/jobify/_internal/scheduler/job.py
def __init__(  # noqa: PLR0913
    self,
    *,
    cron: Cron,
    cron_parser: CronParser,
    failure_count: int = 0,
    job: Job[ReturnT],
    offset: datetime,
    run_count: int,
) -> None:
    """Initialize the CronContext.

    Args:
        cron: The cron configuration.
        cron_parser: The parser used to calculate next run times.
        failure_count: Initial failure count.
        job: The associated job instance.
        offset: The base datetime.
        run_count: Initial run count.

    """
    self.cron = cron
    self.cron_parser = cron_parser
    self.failure_count = failure_count
    self.job = job
    self.offset = offset
    self.run_count = run_count

is_failure_allowed_by_limit()

Check if the job can still fail based on the allowed limit.

Returns:

Type Description
bool

True if the failure limit has not been reached, False otherwise.

Source code in src/jobify/_internal/scheduler/job.py
def is_failure_allowed_by_limit(self) -> bool:
    """Check if the job can still fail based on the allowed limit.

    Returns:
        True if the failure limit has not been reached, False otherwise.

    """
    return self.failure_count < self.cron.max_failures

is_run_exceeded_by_limit()

Check if the maximum number of runs has been exceeded.

Returns:

Type Description
bool

True if the run limit is reached, False otherwise.

Source code in src/jobify/_internal/scheduler/job.py
def is_run_exceeded_by_limit(self) -> bool:
    """Check if the maximum number of runs has been exceeded.

    Returns:
        True if the run limit is reached, False otherwise.

    """
    if self.cron.max_runs == INFINITY:
        return False
    return self.run_count >= self.cron.max_runs

GracePolicy

Bases: NamedTuple

Grace period policy for handling misfired jobs.

Attributes:

Name Type Description
value timedelta

The duration of the grace period.

Source code in src/jobify/_internal/scheduler/misfire_policy.py
class GracePolicy(NamedTuple):
    """Grace period policy for handling misfired jobs.

    Attributes:
        value: The duration of the grace period.

    """

    value: timedelta

    @override
    def __str__(self) -> str:
        return f"<{type(self).__name__} value={self.value}>"

Job

Bases: Generic[ReturnT]

Represents a scheduled job.

Attributes:

Name Type Description
id

Unique identifier for the job.

status

Current status of the job.

exec_at

The scheduled execution time.

exception Exception | None

The exception raised, if any.

Source code in src/jobify/_internal/scheduler/job.py
class Job(Generic[ReturnT]):
    """Represents a scheduled job.

    Attributes:
        id: Unique identifier for the job.
        status: Current status of the job.
        exec_at: The scheduled execution time.
        exception: The exception raised, if any.

    """

    __slots__: tuple[str, ...] = (
        "_cron_context",
        "_event",
        "_handle",
        "_result",
        "_storage",
        "_unregister_hook",
        "exception",
        "exec_at",
        "id",
        "status",
    )

    def __init__(
        self,
        *,
        job_id: str,
        storage: Storage,
        exec_at: datetime,
        unregister_hook: Callable[[str], None],
        job_status: JobStatus = JobStatus.PENDING,
    ) -> None:
        """Initialize the Job.

        Args:
            job_id: Unique identifier for the job.
            storage: Storage backend.
            exec_at: The scheduled execution time.
            unregister_hook: Callback to unregister the job.
            job_status: Initial status of the job.

        """
        self._unregister_hook = unregister_hook
        self._event = asyncio.Event()
        self._result: ReturnT = UNSET
        self._storage = storage
        self._handle: asyncio.Handle | None = None
        self._cron_context: CronContext[ReturnT] | None = None
        self.id = job_id
        self.status = job_status
        self.exception: Exception | None = None
        self.exec_at = exec_at

        self._event.set()

    @property
    def cron_expression(self) -> str | None:
        """Return the cron expression if this is a cron job, else None."""
        if self._cron_context is not None:
            return self._cron_context.cron.expression
        return None

    @override
    def __repr__(self) -> str:
        if self.cron_expression is not None:
            cron_info = f", cron={self.cron_expression!r}"
        else:
            cron_info = ""
        return (
            f"<{type(self).__name__} "
            f"id={self.id!r}, "
            f"status={self.status.value!r}, "
            f"exec_at={self.exec_at.isoformat()!r}"
            f"{cron_info}>"
        )

    def __await__(self) -> Generator[object, None, ReturnT]:
        async def _await() -> ReturnT:
            await self.wait()
            return self.result()

        return _await().__await__()

    def bind_handle(self, handle: asyncio.Handle) -> None:
        """Bind an asyncio handle to the job.

        Args:
            handle: The handle to bind.

        """
        self._handle = handle

    def bind_cron_context(self, ctx: CronContext[ReturnT]) -> None:
        """Bind a cron context to the job.

        Args:
            ctx: The cron context to bind.

        """
        self._cron_context = ctx

    def result(self) -> ReturnT:
        """Return the result of the job.

        Returns:
            The job result.

        Raises:
            JobFailedError: If the job failed.
            JobNotCompletedError: If the job is not yet completed.

        """
        if self.status is JobStatus.SUCCESS or self._result is not UNSET:
            return self._result
        if self.status is JobStatus.FAILED:
            raise JobFailedError(
                self.id,
                reason=str(self.exception),
            ) from self.exception
        raise JobNotCompletedError

    def set_result(self, val: ReturnT, *, status: JobStatus) -> None:
        """Set the result of the job.

        Args:
            val: The job result.
            status: The status to set.

        """
        self._result = val
        self.status = status

    def set_exception(self, exc: Exception, *, status: JobStatus) -> None:
        """Set the exception for the job.

        Args:
            exc: The exception to set.
            status: The status to set.

        """
        self.exception = exc
        self.status = status

    def update(self, *, exec_at: datetime, status: JobStatus) -> None:
        """Update the job schedule and status.

        Args:
            exec_at: The new scheduled time.
            status: The new status.

        """
        self._event = asyncio.Event()
        self.status = status
        self.exec_at = exec_at

    def is_done(self) -> bool:
        """Check if the job is done.

        Returns:
            True if the job is done, False otherwise.

        """
        return self._event.is_set()

    def is_cron(self) -> bool:
        """Check if this is a cron job.

        Returns:
            True if it's a cron job, False otherwise.

        """
        return self._cron_context is not None

    def is_reschedulable(self) -> bool:
        """Check if the job can be rescheduled.

        Returns:
            True if reschedulable, False otherwise.

        """
        return self.status not in (
            JobStatus.PERMANENTLY_FAILED,
            JobStatus.CANCELLED,
        )

    async def wait(self) -> None:
        """Wait until the job is done.

        If the job is already completed, this method returns immediately.
        Safe for concurrent use by multiple coroutines.
        """
        await self._event.wait()

    async def cancel(self) -> None:
        """Cancel the job."""
        self.status = JobStatus.CANCELLED
        self._cancel()
        await self._storage.delete_schedule(self.id)

    def _cancel(self) -> None:
        """Handle cancellation internally."""
        self._event.set()
        self._unregister_hook(self.id)
        if self._handle is not None:
            self._handle.cancel()

cron_expression property

Return the cron expression if this is a cron job, else None.

__init__(*, job_id, storage, exec_at, unregister_hook, job_status=JobStatus.PENDING)

Initialize the Job.

Parameters:

Name Type Description Default
job_id str

Unique identifier for the job.

required
storage Storage

Storage backend.

required
exec_at datetime

The scheduled execution time.

required
unregister_hook Callable[[str], None]

Callback to unregister the job.

required
job_status JobStatus

Initial status of the job.

PENDING
Source code in src/jobify/_internal/scheduler/job.py
def __init__(
    self,
    *,
    job_id: str,
    storage: Storage,
    exec_at: datetime,
    unregister_hook: Callable[[str], None],
    job_status: JobStatus = JobStatus.PENDING,
) -> None:
    """Initialize the Job.

    Args:
        job_id: Unique identifier for the job.
        storage: Storage backend.
        exec_at: The scheduled execution time.
        unregister_hook: Callback to unregister the job.
        job_status: Initial status of the job.

    """
    self._unregister_hook = unregister_hook
    self._event = asyncio.Event()
    self._result: ReturnT = UNSET
    self._storage = storage
    self._handle: asyncio.Handle | None = None
    self._cron_context: CronContext[ReturnT] | None = None
    self.id = job_id
    self.status = job_status
    self.exception: Exception | None = None
    self.exec_at = exec_at

    self._event.set()

bind_cron_context(ctx)

Bind a cron context to the job.

Parameters:

Name Type Description Default
ctx CronContext[ReturnT]

The cron context to bind.

required
Source code in src/jobify/_internal/scheduler/job.py
def bind_cron_context(self, ctx: CronContext[ReturnT]) -> None:
    """Bind a cron context to the job.

    Args:
        ctx: The cron context to bind.

    """
    self._cron_context = ctx

bind_handle(handle)

Bind an asyncio handle to the job.

Parameters:

Name Type Description Default
handle Handle

The handle to bind.

required
Source code in src/jobify/_internal/scheduler/job.py
def bind_handle(self, handle: asyncio.Handle) -> None:
    """Bind an asyncio handle to the job.

    Args:
        handle: The handle to bind.

    """
    self._handle = handle

cancel() async

Cancel the job.

Source code in src/jobify/_internal/scheduler/job.py
async def cancel(self) -> None:
    """Cancel the job."""
    self.status = JobStatus.CANCELLED
    self._cancel()
    await self._storage.delete_schedule(self.id)

is_cron()

Check if this is a cron job.

Returns:

Type Description
bool

True if it's a cron job, False otherwise.

Source code in src/jobify/_internal/scheduler/job.py
def is_cron(self) -> bool:
    """Check if this is a cron job.

    Returns:
        True if it's a cron job, False otherwise.

    """
    return self._cron_context is not None

is_done()

Check if the job is done.

Returns:

Type Description
bool

True if the job is done, False otherwise.

Source code in src/jobify/_internal/scheduler/job.py
def is_done(self) -> bool:
    """Check if the job is done.

    Returns:
        True if the job is done, False otherwise.

    """
    return self._event.is_set()

is_reschedulable()

Check if the job can be rescheduled.

Returns:

Type Description
bool

True if reschedulable, False otherwise.

Source code in src/jobify/_internal/scheduler/job.py
def is_reschedulable(self) -> bool:
    """Check if the job can be rescheduled.

    Returns:
        True if reschedulable, False otherwise.

    """
    return self.status not in (
        JobStatus.PERMANENTLY_FAILED,
        JobStatus.CANCELLED,
    )

result()

Return the result of the job.

Returns:

Type Description
ReturnT

The job result.

Raises:

Type Description
JobFailedError

If the job failed.

JobNotCompletedError

If the job is not yet completed.

Source code in src/jobify/_internal/scheduler/job.py
def result(self) -> ReturnT:
    """Return the result of the job.

    Returns:
        The job result.

    Raises:
        JobFailedError: If the job failed.
        JobNotCompletedError: If the job is not yet completed.

    """
    if self.status is JobStatus.SUCCESS or self._result is not UNSET:
        return self._result
    if self.status is JobStatus.FAILED:
        raise JobFailedError(
            self.id,
            reason=str(self.exception),
        ) from self.exception
    raise JobNotCompletedError

set_exception(exc, *, status)

Set the exception for the job.

Parameters:

Name Type Description Default
exc Exception

The exception to set.

required
status JobStatus

The status to set.

required
Source code in src/jobify/_internal/scheduler/job.py
def set_exception(self, exc: Exception, *, status: JobStatus) -> None:
    """Set the exception for the job.

    Args:
        exc: The exception to set.
        status: The status to set.

    """
    self.exception = exc
    self.status = status

set_result(val, *, status)

Set the result of the job.

Parameters:

Name Type Description Default
val ReturnT

The job result.

required
status JobStatus

The status to set.

required
Source code in src/jobify/_internal/scheduler/job.py
def set_result(self, val: ReturnT, *, status: JobStatus) -> None:
    """Set the result of the job.

    Args:
        val: The job result.
        status: The status to set.

    """
    self._result = val
    self.status = status

update(*, exec_at, status)

Update the job schedule and status.

Parameters:

Name Type Description Default
exec_at datetime

The new scheduled time.

required
status JobStatus

The new status.

required
Source code in src/jobify/_internal/scheduler/job.py
def update(self, *, exec_at: datetime, status: JobStatus) -> None:
    """Update the job schedule and status.

    Args:
        exec_at: The new scheduled time.
        status: The new status.

    """
    self._event = asyncio.Event()
    self.status = status
    self.exec_at = exec_at

wait() async

Wait until the job is done.

If the job is already completed, this method returns immediately. Safe for concurrent use by multiple coroutines.

Source code in src/jobify/_internal/scheduler/job.py
async def wait(self) -> None:
    """Wait until the job is done.

    If the job is already completed, this method returns immediately.
    Safe for concurrent use by multiple coroutines.
    """
    await self._event.wait()

JobContext

Bases: NamedTuple

Context object injected into jobs at runtime.

Provides access to job execution context, allowing jobs to inspect their configuration, state, and scheduling builder.

Attributes:

Name Type Description
job Job[Any]

The job instance.

state State

The global application state.

runnable Runnable[Any]

The runnable component.

request_state RequestState

State specific to the current request.

route_options RouteOptions

Configuration options for the route.

jobify_config JobifyConfiguration

Global Jobify configuration.

schedule_builder ScheduleBuilder[Any]

Builder used to construct the schedule.

Source code in src/jobify/_internal/context.py
class JobContext(NamedTuple):
    """Context object injected into jobs at runtime.

    Provides access to job execution context, allowing jobs to inspect
    their configuration, state, and scheduling builder.

    Attributes:
        job: The job instance.
        state: The global application state.
        runnable: The runnable component.
        request_state: State specific to the current request.
        route_options: Configuration options for the route.
        jobify_config: Global Jobify configuration.
        schedule_builder: Builder used to construct the schedule.

    """

    job: Job[Any]
    state: State
    runnable: Runnable[Any]
    request_state: RequestState
    route_options: RouteOptions
    jobify_config: JobifyConfiguration
    schedule_builder: ScheduleBuilder[Any]

JobRouter

Bases: Router

Router for organizing jobs and sub-routers.

NodeRouter is a container for routes and sub-routers, providing a way to structure large applications by grouping related jobs together.

Parameters:

Name Type Description Default
state State | None

Optional initial state for the router.

None
prefix str | None

Optional prefix for route names.

None
lifespan Lifespan[NodeRouter_co] | None

Optional lifespan handler for the router.

None
middleware Sequence[BaseMiddleware] | None

Middleware to apply to jobs in this router.

None
outer_middleware Sequence[BaseOuterMiddleware] | None

Middleware to apply to scheduling process.

None
exception_handlers MappingExceptionHandlers | None

Exception handlers for jobs.

None
route_class type[NodeRoute[..., Any]]

Class to use for creating new routes.

NodeRoute
Source code in src/jobify/_internal/router/node.py
class NodeRouter(Router):
    """Router for organizing jobs and sub-routers.

    `NodeRouter` is a container for routes and sub-routers, providing
    a way to structure large applications by grouping related jobs together.

    Args:
        state: Optional initial state for the router.
        prefix: Optional prefix for route names.
        lifespan: Optional lifespan handler for the router.
        middleware: Middleware to apply to jobs in this router.
        outer_middleware: Middleware to apply to scheduling process.
        exception_handlers: Exception handlers for jobs.
        route_class: Class to use for creating new routes.

    """

    def __init__(  # noqa: PLR0913
        self,
        *,
        state: State | None = None,
        prefix: str | None = None,
        lifespan: Lifespan[NodeRouter_co] | None = None,
        middleware: Sequence[BaseMiddleware] | None = None,
        outer_middleware: Sequence[BaseOuterMiddleware] | None = None,
        exception_handlers: MappingExceptionHandlers | None = None,
        route_class: type[NodeRoute[..., Any]] = NodeRoute,
    ) -> None:
        super().__init__(prefix=prefix)
        self._registrator: NodeRegistrator = NodeRegistrator(
            state=state,
            lifespan=lifespan,
            middleware=middleware,
            outer_middleware=outer_middleware,
            exception_handlers=exception_handlers,
            route_class=route_class,
        )
        self.state: State = self._registrator.state
        self.exception_handlers: ExceptionHandlers = (
            self._registrator._exception_handlers
        )

    @property
    @override
    def task(self) -> NodeRegistrator:
        return self._registrator

    @property
    @override
    def routes(self) -> Iterator[NodeRoute[..., Any]]:
        yield from self.task._routes.values()

    @property
    @override
    def sub_routers(self) -> list[NodeRouter]:
        return cast("list[NodeRouter]", self._sub_routers)

JobStatus

Bases: str, Enum

The status of a job.

Attributes:

Name Type Description
PENDING

Job is waiting to be processed.

SCHEDULED

Job is scheduled for future execution.

RUNNING

Job is currently executing.

CANCELLED

Job was cancelled.

SUCCESS

Job completed successfully.

FAILED

Job failed execution.

TIMEOUT

Job timed out.

PERMANENTLY_FAILED

Job failed and exhausted all retry attempts.

Source code in src/jobify/_internal/common/constants.py
@unique
class JobStatus(str, Enum):
    """The status of a job.

    Attributes:
        PENDING: Job is waiting to be processed.
        SCHEDULED: Job is scheduled for future execution.
        RUNNING: Job is currently executing.
        CANCELLED: Job was cancelled.
        SUCCESS: Job completed successfully.
        FAILED: Job failed execution.
        TIMEOUT: Job timed out.
        PERMANENTLY_FAILED: Job failed and exhausted all retry attempts.

    """

    PENDING = "pending"
    SCHEDULED = "scheduled"
    RUNNING = "running"
    CANCELLED = "cancelled"
    SUCCESS = "success"
    FAILED = "failed"
    TIMEOUT = "timeout"
    PERMANENTLY_FAILED = "permanently_failed"

Jobify

Bases: RootRouter

Jobify is the main app for scheduling and managing background jobs.

It provides a flexible and extensible framework for defining, running, and persisting jobs, supporting various executors, middleware, and serialization options.

Attributes:

Name Type Description
configs JobifyConfiguration

Configuration object for the application.

plugins set[Plugin]

Set of registered plugins.

Source code in src/jobify/jobify.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
class Jobify(RootRouter):
    """Jobify is the main app for scheduling and managing background jobs.

    It provides a flexible and extensible framework for defining, running,
    and persisting jobs, supporting various executors, middleware, and
    serialization options.

    Attributes:
        configs: Configuration object for the application.
        plugins: Set of registered plugins.

    """

    def __init__(  # noqa: PLR0913
        self,
        *,
        tz: ZoneInfo | None = None,
        state: State | None = None,
        dumper: Dumper | None = None,
        loader: Loader | None = None,
        storage: Storage | Literal[False] = UNSET,
        lifespan: Lifespan[AppT] | None = None,
        serializer: Serializer | None = None,
        middleware: Sequence[BaseMiddleware] | None = None,
        outer_middleware: Sequence[BaseOuterMiddleware] | None = None,
        cron_factory: CronFactory = create_crontab,
        loop_factory: LoopFactory = asyncio.get_running_loop,
        exception_handlers: MappingExceptionHandlers | None = None,
        threadpool_executor: ThreadPoolExecutor | None = None,
        processpool_executor: ProcessPoolExecutor | None = None,
        route_class: type[RootRoute[..., Any]] = RootRoute,
        plugins: Sequence[Plugin] = (),
        uuid_generator: UUIDGenerator = uuid.uuid4,
    ) -> None:
        """Initialize a `Jobify` instance.

        Args:
            tz: Timezone info, defaults to UTC.
            state: Optional initial application state.
            dumper: Dumper for serialization.
            loader: Loader for deserialization.
            storage: Storage backend, defaults to SQLite if not provided.
            lifespan: Optional lifespan handler.
            serializer: Serializer for job messages.
            middleware: List of middleware components.
            outer_middleware: List of outer middleware components.
            cron_factory: Factory function for cron parsing.
            loop_factory: Factory function for the event loop.
            exception_handlers: Mapping of exception types to handlers.
            threadpool_executor: Executor for thread-based jobs.
            processpool_executor: Executor for process-based jobs.
            route_class: Class to use for root routes.
            plugins: List of plugins to register.
            uuid_generator: uuid generator factory.

        """
        getloop = cache_result(loop_factory)
        tz = tz or ZoneInfo("UTC")

        if storage is False:
            storage = DummyStorage()
        elif storage is UNSET:
            storage = SQLiteStorage()

        if isinstance(storage, SQLiteStorage):
            storage.getloop = getloop
            storage.threadpool = threadpool_executor
            storage.tz = tz

        system_types = (
            Message,
            Cron,
            PushArguments,
            AtArguments,
            CronArguments,
            MisfirePolicy,
            GracePolicy,
        )
        if serializer is None:
            serializer = (
                ExtendedJSONSerializer(system_types)
                if dumper is None and loader is None
                else JSONSerializer()
            )
        elif isinstance(serializer, ExtendedJSONSerializer):
            serializer.add_system_types(system_types)

        if dumper is None:
            dumper = DummyDumper()
        if loader is None:
            loader = DummyLoader()

        self.configs: JobifyConfiguration = JobifyConfiguration(
            tz=tz,
            dumper=dumper,
            loader=loader,
            storage=storage,
            getloop=getloop,
            serializer=serializer,
            worker_pools=WorkerPools(
                _processpool=processpool_executor,
                threadpool=threadpool_executor,
            ),
            cron_factory=cron_factory,
            uuid_generator=uuid_generator,
        )
        idle_event = asyncio.Event()
        idle_event.set()
        super().__init__(
            state=state,
            lifespan=lifespan,
            middleware=middleware,
            outer_middleware=outer_middleware,
            task_tracker=TaskTracker(
                pending_jobs={},
                pending_tasks={},
                idle_event=idle_event,
            ),
            jobify_config=self.configs,
            exception_handlers=exception_handlers,
            route_class=route_class,
        )
        self._captured_signals: list[int] = []
        self.plugins: set[Plugin] = set(plugins)

    def add_plugin(self, plug: Plugin, /) -> None:
        """Register a plugin instance for application lifecycle hooks."""
        self.plugins.add(plug)

    def find_job(self, id_: str, /) -> Job[ReturnT] | None:
        """Find an active job by its ID.

        Args:
            id_: Unique identifier of the job.

        Returns:
            The `Job` instance if it's currently pending or running,
            otherwise `None`.

        """
        return self.task._task_tracker.pending_jobs.get(id_)

    def get_active_jobs(self) -> list[Job[Any]]:
        """Return a list of all currently active jobs."""
        return list(self.task._task_tracker.pending_jobs.values())

    async def __aenter__(self) -> Self:
        """Enter the Jobify context manager.

        Returns:
            The initialized Jobify instance ready for use.

        Raises:
            Exception: Any exception raised by `startup()` method.

        """
        await self.startup()
        return self

    async def startup(self) -> None:
        """Initialize the Jobify application.

        This method:
        1. Marks the application as started
        2. Propagates startup events to all routers and their registrators
        3. Schedules any pending cron jobs

        Raises:
            RuntimeError: If application startup fails due to configuration issues or
                router initialization errors.

        """

        def chain_middlewares() -> Iterator[BaseMiddleware]:
            for router in self.chain_tail:
                yield from router.task._middleware

        for mid in chain_middlewares():
            if isinstance(mid, Plugin):
                self.add_plugin(mid)

        for plug in self.plugins:
            await plug.startup(self)

        self.propagate_real_routes()

        self.configs.app_started = True

        await self.configs.storage.startup()
        await self._propagate_startup(self)
        await self._restore_schedules()

        logger.info("Jobify startup complete. Ready to schedule jobs.")

    async def _restore_schedules(self) -> None:
        """Restore schedules.

        Phase 1: Synchronize Declarative Cron Jobs (Code and Database)
            - If there are changes or new jobs, run the full process
              (middleware and database persistence).
            - If no changes, silently restore jobs in memory.
        Phase 2: Restore Imperative Jobs (Database Only)
            - Restore valid jobs and delete obsolete ones from the database.
        """
        crons_def: CronsDefinition = self.task.state.pop(CRONS_DEF_KEY, {})
        schedules = await self.configs.storage.get_schedules()

        to_delete: list[str] = []
        processed_jobs: set[str] = set()
        db_map = {sch.job_id: sch for sch in schedules}

        # --- PHASE 1: Declarative Crons ---
        for job_id, (rout, cron_def) in crons_def.items():
            processed_jobs.add(job_id)
            builder = rout.schedule(*cron_def.args, **cron_def.kwargs)

            if (sch_in_db := db_map.get(job_id)) is not None:
                restored = self._restore_job_from_storage(sch_in_db)
                if (
                    restored is not None
                    and (msg := restored[1])
                    and isinstance(msg.trigger, CronArguments)
                ):
                    self._start_restored_job_in_memory(
                        restored[0],
                        msg.trigger,
                        sch_in_db.next_run_at,
                    )
                    if msg.trigger.cron == cron_def:
                        continue

            await builder.cron(cron_def, job_id=job_id, replace=True, force=True)

        # --- PHASE 2: Dynamic/Imperative Jobs & Cleanup ---
        for job_id, sch in db_map.items():
            if job_id in processed_jobs:
                continue

            if job_id.endswith(PATCH_CRON_DEF_ID):
                to_delete.append(job_id)
                continue

            if (restored := self._restore_job_from_storage(sch)) is not None:
                builder, msg, _ = restored
                self._start_restored_job_in_memory(
                    builder,
                    msg.trigger,
                    sch.next_run_at,
                )
                continue

            to_delete.append(job_id)

        if to_delete:
            logger.info("Cleaning up %d obsolete/broken jobs", len(to_delete))
            await self.configs.storage.delete_schedule_many(to_delete)

    def _restore_job_from_storage(
        self,
        sch: ScheduledJob,
    ) -> tuple[ScheduleBuilder[Any], Message, inspect.BoundArguments] | None:
        """Safely deserializes and binds a job from storage.

        Returns:
            Tuple (Builder, Message, BoundArguments) or None if failed.

        """
        route = self.task._routes.get(sch.name)
        if not route or route.options.get("durable") is False:
            return None
        try:
            raw_msg = self.configs.serializer.loadb(sch.message)
            msg = self.configs.loader.load(raw_msg, Message)

            for name, raw_arg in msg.arguments.items():
                param_type = route.func_spec.type_params[name]
                msg.arguments[name] = self.configs.loader.load(
                    raw_arg,
                    param_type,
                )
            bound = route.func_spec.signature.bind(**msg.arguments)
        except (KeyError, TypeError, ValueError) as exc:
            logger.warning(
                "Failed to revive job %s (%s). Reason: %s",
                sch.job_id,
                sch.name,
                exc,
            )
            return None
        else:
            builder = route.create_builder(bound)
            return builder, msg, bound

    def _start_restored_job_in_memory(
        self,
        builder: ScheduleBuilder[Any],
        trigger: Triggers,
        db_next_run_at: datetime,
    ) -> None:
        """Start the internal timer for a restored job.

        IMPORTANT: This bypasses middleware and database persistence.
        It handles misfire policy to ensure we do not execute old jobs
        incorrectly.
        """
        match trigger:
            case PushArguments():
                builder._push(trigger.job_id, exec_at=db_next_run_at)
            case AtArguments():
                builder._at(trigger.at, trigger.job_id)
            case CronArguments():
                parser = self.configs.cron_factory(trigger.cron.expression)
                next_run_at = handle_misfire_policy(
                    cron_parser=parser,
                    next_run_at=db_next_run_at,
                    real_now=builder.now(),
                    policy=trigger.cron.misfire_policy,
                )
                builder._cron(
                    cron=trigger.cron,
                    job_id=trigger.job_id,
                    offset=trigger.offset,
                    next_run_at=next_run_at,
                    cron_parser=parser,
                    run_count=trigger.run_count,
                )

    async def __aexit__(
        self,
        _exc_type: type[BaseException] | None = None,
        _exc_val: BaseException | None = None,
        _exc_tb: TracebackType | None = None,
    ) -> None:
        """Exit the Jobify context manager.

        Note:
            This method ensures proper shutdown regardless of whether an
            exception occurred in the managed context. The exception parameters
            are ignored as shutdown should proceed even if the context failed.

        """
        await self.shutdown()

    async def shutdown(self) -> None:
        """Gracefully shut down the Jobify application.

        This method performs a structured shutdown:
        1. Marks the application as stopped (`app_started = False`).
        2. Propagates shutdown events to all routers/components.
        3. Cancels all scheduled future jobs in the registry
           (`_jobs_registry`).
        4. Closes the jobify configuration (e.g., stopping the internal
           scheduler).
        5. Cancels all currently running tasks (in `_tasks_registry`), waits
           for their completion, and explicitly clears the task registry.

        Note:
            The method uses `return_exceptions=True` when gathering cancelled
            tasks to prevent shutdown from being interrupted by task exception.

        """
        for plug in self.plugins:
            await plug.shutdown()

        self.configs.app_started = False

        if jobs := tuple(self.task._task_tracker.pending_jobs.values()):
            for job in jobs:
                job._cancel()

        if tasks := self.task._task_tracker.pending_tasks.values():  # pragma: no cover
            for task in tuple(tasks):
                task.cancel()
            await asyncio.gather(*tasks, return_exceptions=True)

        self.configs.worker_pools.close()
        await self._propagate_shutdown()
        await self.configs.storage.shutdown()
        self._raise_captured_signals()

        logger.info("Jobify shutdown complete.")

    def _raise_captured_signals(self) -> None:
        # If we did gracefully shut down due to a signal, try to
        # trigger the expected behaviour now; multiple signals would be
        # done LIFO, see https://stackoverflow.com/questions/48434964
        for captured_signal in reversed(self._captured_signals):
            signal.raise_signal(captured_signal)

    async def wait_all(self, timeout: float | None = None) -> None:
        """Wait for all currently scheduled jobs to complete.

        This method waits until all jobs currently registered have finished
        executing (with statuses of SUCCESS, FAILED, or TIMEOUT). This is
        useful in situations where it's important to ensure that background
        tasks have completed before moving on.

        The method sets an internal event when both conditions are met:
        1. No jobs remain in the jobs registry (`_jobs_registry`)

        Args:
            timeout (optional): The maximum time in seconds to wait for the
                jobs to complete. If not specified, the default value of `None`
                will be used, which means the job will wait indefinitely. If a
                timeout is specified and it is reached, the method will raise
                an `asyncio.TimeoutError`.

        """
        idle_event = self.task._task_tracker.idle_event
        with self._capture_signals():
            await asyncio.wait_for(idle_event.wait(), timeout=timeout)

    @contextmanager
    def _capture_signals(self) -> Generator[None]:
        handled_signals = [
            signal.SIGINT,  # Unix signal 2. Sent by Ctrl+C.
            signal.SIGTERM,  # Unix signal 15. Sent by `kill <pid>`.
        ]
        if sys.platform == "win32":  # pragma: no cover
            handled_signals.append(
                signal.SIGBREAK  # Windows signal 21. Sent by Ctrl+Break.
            )

        # Signals can only be listened to from the main thread.
        if threading.current_thread() is not threading.main_thread():
            yield
            return
        # always use signal.signal, even if loop.add_signal_handler is
        # available this allows to restore previous signal handlers later on
        original_handlers = {
            sig: signal.signal(sig, self._handle_exit) for sig in handled_signals
        }
        try:
            yield
        finally:
            for sig, handler in original_handlers.items():
                signal.signal(sig, handler)

    def _handle_exit(self, sig: int, _: FrameType | None) -> None:
        self._captured_signals.append(sig)
        loop = self.configs.getloop()
        idle_event = self.task._task_tracker.idle_event
        loop.call_soon_threadsafe(idle_event.set)

__aenter__() async

Enter the Jobify context manager.

Returns:

Type Description
Self

The initialized Jobify instance ready for use.

Raises:

Type Description
Exception

Any exception raised by startup() method.

Source code in src/jobify/jobify.py
async def __aenter__(self) -> Self:
    """Enter the Jobify context manager.

    Returns:
        The initialized Jobify instance ready for use.

    Raises:
        Exception: Any exception raised by `startup()` method.

    """
    await self.startup()
    return self

__aexit__(_exc_type=None, _exc_val=None, _exc_tb=None) async

Exit the Jobify context manager.

Note

This method ensures proper shutdown regardless of whether an exception occurred in the managed context. The exception parameters are ignored as shutdown should proceed even if the context failed.

Source code in src/jobify/jobify.py
async def __aexit__(
    self,
    _exc_type: type[BaseException] | None = None,
    _exc_val: BaseException | None = None,
    _exc_tb: TracebackType | None = None,
) -> None:
    """Exit the Jobify context manager.

    Note:
        This method ensures proper shutdown regardless of whether an
        exception occurred in the managed context. The exception parameters
        are ignored as shutdown should proceed even if the context failed.

    """
    await self.shutdown()

__init__(*, tz=None, state=None, dumper=None, loader=None, storage=UNSET, lifespan=None, serializer=None, middleware=None, outer_middleware=None, cron_factory=create_crontab, loop_factory=asyncio.get_running_loop, exception_handlers=None, threadpool_executor=None, processpool_executor=None, route_class=RootRoute, plugins=(), uuid_generator=uuid.uuid4)

Initialize a Jobify instance.

Parameters:

Name Type Description Default
tz ZoneInfo | None

Timezone info, defaults to UTC.

None
state State | None

Optional initial application state.

None
dumper Dumper | None

Dumper for serialization.

None
loader Loader | None

Loader for deserialization.

None
storage Storage | Literal[False]

Storage backend, defaults to SQLite if not provided.

UNSET
lifespan Lifespan[AppT] | None

Optional lifespan handler.

None
serializer Serializer | None

Serializer for job messages.

None
middleware Sequence[BaseMiddleware] | None

List of middleware components.

None
outer_middleware Sequence[BaseOuterMiddleware] | None

List of outer middleware components.

None
cron_factory CronFactory

Factory function for cron parsing.

create_crontab
loop_factory LoopFactory

Factory function for the event loop.

get_running_loop
exception_handlers MappingExceptionHandlers | None

Mapping of exception types to handlers.

None
threadpool_executor ThreadPoolExecutor | None

Executor for thread-based jobs.

None
processpool_executor ProcessPoolExecutor | None

Executor for process-based jobs.

None
route_class type[RootRoute[..., Any]]

Class to use for root routes.

RootRoute
plugins Sequence[Plugin]

List of plugins to register.

()
uuid_generator UUIDGenerator

uuid generator factory.

uuid4
Source code in src/jobify/jobify.py
def __init__(  # noqa: PLR0913
    self,
    *,
    tz: ZoneInfo | None = None,
    state: State | None = None,
    dumper: Dumper | None = None,
    loader: Loader | None = None,
    storage: Storage | Literal[False] = UNSET,
    lifespan: Lifespan[AppT] | None = None,
    serializer: Serializer | None = None,
    middleware: Sequence[BaseMiddleware] | None = None,
    outer_middleware: Sequence[BaseOuterMiddleware] | None = None,
    cron_factory: CronFactory = create_crontab,
    loop_factory: LoopFactory = asyncio.get_running_loop,
    exception_handlers: MappingExceptionHandlers | None = None,
    threadpool_executor: ThreadPoolExecutor | None = None,
    processpool_executor: ProcessPoolExecutor | None = None,
    route_class: type[RootRoute[..., Any]] = RootRoute,
    plugins: Sequence[Plugin] = (),
    uuid_generator: UUIDGenerator = uuid.uuid4,
) -> None:
    """Initialize a `Jobify` instance.

    Args:
        tz: Timezone info, defaults to UTC.
        state: Optional initial application state.
        dumper: Dumper for serialization.
        loader: Loader for deserialization.
        storage: Storage backend, defaults to SQLite if not provided.
        lifespan: Optional lifespan handler.
        serializer: Serializer for job messages.
        middleware: List of middleware components.
        outer_middleware: List of outer middleware components.
        cron_factory: Factory function for cron parsing.
        loop_factory: Factory function for the event loop.
        exception_handlers: Mapping of exception types to handlers.
        threadpool_executor: Executor for thread-based jobs.
        processpool_executor: Executor for process-based jobs.
        route_class: Class to use for root routes.
        plugins: List of plugins to register.
        uuid_generator: uuid generator factory.

    """
    getloop = cache_result(loop_factory)
    tz = tz or ZoneInfo("UTC")

    if storage is False:
        storage = DummyStorage()
    elif storage is UNSET:
        storage = SQLiteStorage()

    if isinstance(storage, SQLiteStorage):
        storage.getloop = getloop
        storage.threadpool = threadpool_executor
        storage.tz = tz

    system_types = (
        Message,
        Cron,
        PushArguments,
        AtArguments,
        CronArguments,
        MisfirePolicy,
        GracePolicy,
    )
    if serializer is None:
        serializer = (
            ExtendedJSONSerializer(system_types)
            if dumper is None and loader is None
            else JSONSerializer()
        )
    elif isinstance(serializer, ExtendedJSONSerializer):
        serializer.add_system_types(system_types)

    if dumper is None:
        dumper = DummyDumper()
    if loader is None:
        loader = DummyLoader()

    self.configs: JobifyConfiguration = JobifyConfiguration(
        tz=tz,
        dumper=dumper,
        loader=loader,
        storage=storage,
        getloop=getloop,
        serializer=serializer,
        worker_pools=WorkerPools(
            _processpool=processpool_executor,
            threadpool=threadpool_executor,
        ),
        cron_factory=cron_factory,
        uuid_generator=uuid_generator,
    )
    idle_event = asyncio.Event()
    idle_event.set()
    super().__init__(
        state=state,
        lifespan=lifespan,
        middleware=middleware,
        outer_middleware=outer_middleware,
        task_tracker=TaskTracker(
            pending_jobs={},
            pending_tasks={},
            idle_event=idle_event,
        ),
        jobify_config=self.configs,
        exception_handlers=exception_handlers,
        route_class=route_class,
    )
    self._captured_signals: list[int] = []
    self.plugins: set[Plugin] = set(plugins)

add_plugin(plug)

Register a plugin instance for application lifecycle hooks.

Source code in src/jobify/jobify.py
def add_plugin(self, plug: Plugin, /) -> None:
    """Register a plugin instance for application lifecycle hooks."""
    self.plugins.add(plug)

find_job(id_)

Find an active job by its ID.

Parameters:

Name Type Description Default
id_ str

Unique identifier of the job.

required

Returns:

Type Description
Job[ReturnT] | None

The Job instance if it's currently pending or running,

Job[ReturnT] | None

otherwise None.

Source code in src/jobify/jobify.py
def find_job(self, id_: str, /) -> Job[ReturnT] | None:
    """Find an active job by its ID.

    Args:
        id_: Unique identifier of the job.

    Returns:
        The `Job` instance if it's currently pending or running,
        otherwise `None`.

    """
    return self.task._task_tracker.pending_jobs.get(id_)

get_active_jobs()

Return a list of all currently active jobs.

Source code in src/jobify/jobify.py
def get_active_jobs(self) -> list[Job[Any]]:
    """Return a list of all currently active jobs."""
    return list(self.task._task_tracker.pending_jobs.values())

shutdown() async

Gracefully shut down the Jobify application.

This method performs a structured shutdown: 1. Marks the application as stopped (app_started = False). 2. Propagates shutdown events to all routers/components. 3. Cancels all scheduled future jobs in the registry (_jobs_registry). 4. Closes the jobify configuration (e.g., stopping the internal scheduler). 5. Cancels all currently running tasks (in _tasks_registry), waits for their completion, and explicitly clears the task registry.

Note

The method uses return_exceptions=True when gathering cancelled tasks to prevent shutdown from being interrupted by task exception.

Source code in src/jobify/jobify.py
async def shutdown(self) -> None:
    """Gracefully shut down the Jobify application.

    This method performs a structured shutdown:
    1. Marks the application as stopped (`app_started = False`).
    2. Propagates shutdown events to all routers/components.
    3. Cancels all scheduled future jobs in the registry
       (`_jobs_registry`).
    4. Closes the jobify configuration (e.g., stopping the internal
       scheduler).
    5. Cancels all currently running tasks (in `_tasks_registry`), waits
       for their completion, and explicitly clears the task registry.

    Note:
        The method uses `return_exceptions=True` when gathering cancelled
        tasks to prevent shutdown from being interrupted by task exception.

    """
    for plug in self.plugins:
        await plug.shutdown()

    self.configs.app_started = False

    if jobs := tuple(self.task._task_tracker.pending_jobs.values()):
        for job in jobs:
            job._cancel()

    if tasks := self.task._task_tracker.pending_tasks.values():  # pragma: no cover
        for task in tuple(tasks):
            task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)

    self.configs.worker_pools.close()
    await self._propagate_shutdown()
    await self.configs.storage.shutdown()
    self._raise_captured_signals()

    logger.info("Jobify shutdown complete.")

startup() async

Initialize the Jobify application.

This method: 1. Marks the application as started 2. Propagates startup events to all routers and their registrators 3. Schedules any pending cron jobs

Raises:

Type Description
RuntimeError

If application startup fails due to configuration issues or router initialization errors.

Source code in src/jobify/jobify.py
async def startup(self) -> None:
    """Initialize the Jobify application.

    This method:
    1. Marks the application as started
    2. Propagates startup events to all routers and their registrators
    3. Schedules any pending cron jobs

    Raises:
        RuntimeError: If application startup fails due to configuration issues or
            router initialization errors.

    """

    def chain_middlewares() -> Iterator[BaseMiddleware]:
        for router in self.chain_tail:
            yield from router.task._middleware

    for mid in chain_middlewares():
        if isinstance(mid, Plugin):
            self.add_plugin(mid)

    for plug in self.plugins:
        await plug.startup(self)

    self.propagate_real_routes()

    self.configs.app_started = True

    await self.configs.storage.startup()
    await self._propagate_startup(self)
    await self._restore_schedules()

    logger.info("Jobify startup complete. Ready to schedule jobs.")

wait_all(timeout=None) async

Wait for all currently scheduled jobs to complete.

This method waits until all jobs currently registered have finished executing (with statuses of SUCCESS, FAILED, or TIMEOUT). This is useful in situations where it's important to ensure that background tasks have completed before moving on.

The method sets an internal event when both conditions are met: 1. No jobs remain in the jobs registry (_jobs_registry)

Parameters:

Name Type Description Default
timeout optional

The maximum time in seconds to wait for the jobs to complete. If not specified, the default value of None will be used, which means the job will wait indefinitely. If a timeout is specified and it is reached, the method will raise an asyncio.TimeoutError.

None
Source code in src/jobify/jobify.py
async def wait_all(self, timeout: float | None = None) -> None:
    """Wait for all currently scheduled jobs to complete.

    This method waits until all jobs currently registered have finished
    executing (with statuses of SUCCESS, FAILED, or TIMEOUT). This is
    useful in situations where it's important to ensure that background
    tasks have completed before moving on.

    The method sets an internal event when both conditions are met:
    1. No jobs remain in the jobs registry (`_jobs_registry`)

    Args:
        timeout (optional): The maximum time in seconds to wait for the
            jobs to complete. If not specified, the default value of `None`
            will be used, which means the job will wait indefinitely. If a
            timeout is specified and it is reached, the method will raise
            an `asyncio.TimeoutError`.

    """
    idle_event = self.task._task_tracker.idle_event
    with self._capture_signals():
        await asyncio.wait_for(idle_event.wait(), timeout=timeout)

MisfirePolicy

Bases: str, Enum

Policy for handling jobs that missed their scheduled run time.

Attributes:

Name Type Description
ALL

Run all missed executions.

SKIP

Skip missed executions and schedule the next one.

ONCE

Run once immediately.

Source code in src/jobify/_internal/scheduler/misfire_policy.py
class MisfirePolicy(str, Enum):
    """Policy for handling jobs that missed their scheduled run time.

    Attributes:
        ALL: Run all missed executions.
        SKIP: Skip missed executions and schedule the next one.
        ONCE: Run once immediately.

    """

    ALL = "all"
    SKIP = "skip"
    ONCE = "once"

    @staticmethod
    def GRACE(t: timedelta, /) -> GracePolicy:  # noqa: N802
        return GracePolicy(t)

OuterContext

Context object passed to middleware during the scheduling process.

This object holds all information required to inspect, modify, or intercept a job before it is officially scheduled.

Attributes:

Name Type Description
job

The job instance being scheduled.

state

The global application state.

trigger

The trigger mechanism (e.g., Cron, Push).

runnable

The runnable component (function/method) being scheduled.

arguments

The arguments bound to the job function.

func_spec

Inspection details of the job function.

is_force

Whether the job is forced to run.

is_persist

Whether the job should be persisted to storage.

is_replace

Whether the job replaces an existing one.

route_options

Configuration options for the route.

jobify_config

Global Jobify configuration.

request_state

State specific to the current request.

persist_job_hook

Callback to persist the job.

schedule_hook

Callback to schedule the job.

schedule_builder

Builder used to construct the schedule.

Source code in src/jobify/_internal/context.py
class OuterContext:
    """Context object passed to middleware during the scheduling process.

    This object holds all information required to inspect, modify, or
    intercept a job before it is officially scheduled.

    Attributes:
        job: The job instance being scheduled.
        state: The global application state.
        trigger: The trigger mechanism (e.g., Cron, Push).
        runnable: The runnable component (function/method) being scheduled.
        arguments: The arguments bound to the job function.
        func_spec: Inspection details of the job function.
        is_force: Whether the job is forced to run.
        is_persist: Whether the job should be persisted to storage.
        is_replace: Whether the job replaces an existing one.
        route_options: Configuration options for the route.
        jobify_config: Global Jobify configuration.
        request_state: State specific to the current request.
        persist_job_hook: Callback to persist the job.
        schedule_hook: Callback to schedule the job.
        schedule_builder: Builder used to construct the schedule.

    """

    __slots__: tuple[str, ...] = (
        "arguments",
        "func_spec",
        "is_force",
        "is_persist",
        "is_replace",
        "job",
        "jobify_config",
        "persist_job_hook",
        "request_state",
        "route_options",
        "runnable",
        "schedule_builder",
        "schedule_hook",
        "state",
        "trigger",
    )

    # ... (rest of the __init__ remains the same)
    def __init__(  # noqa: PLR0913
        self,
        *,
        job: Job[Any],
        state: State,
        trigger: Triggers,
        runnable: Runnable[Any],
        arguments: dict[str, Any],
        func_spec: FuncSpec[Any],
        is_force: bool,
        is_persist: bool,
        is_replace: bool,
        route_options: RouteOptions,
        jobify_config: JobifyConfiguration,
        request_state: RequestState,
        persist_job_hook: Callable[[str, datetime, Triggers], Awaitable[None]],
        schedule_hook: Callable[[], asyncio.Handle],
        schedule_builder: ScheduleBuilder[Any],
    ) -> None:
        self.job = job
        self.state = state
        self.trigger = trigger
        self.runnable = runnable
        self.arguments = arguments
        self.func_spec = func_spec
        self.is_force = is_force
        self.is_persist = is_persist
        self.is_replace = is_replace
        self.route_options = route_options
        self.jobify_config = jobify_config
        self.request_state = request_state
        self.persist_job_hook = persist_job_hook
        self.schedule_hook = schedule_hook
        self.schedule_builder = schedule_builder

Plugin

Bases: Protocol

Lifecycle hooks for integrating extensions into a Jobify app.

Source code in src/jobify/jobify.py
@runtime_checkable
class Plugin(Protocol):
    """Lifecycle hooks for integrating extensions into a `Jobify` app."""

    async def startup(self, app: Jobify) -> None:
        """Run plugin initialization when the application starts."""
        ...

    async def shutdown(self) -> None:
        """Run plugin cleanup before the application shuts down."""
        ...

shutdown() async

Run plugin cleanup before the application shuts down.

Source code in src/jobify/jobify.py
async def shutdown(self) -> None:
    """Run plugin cleanup before the application shuts down."""
    ...

startup(app) async

Run plugin initialization when the application starts.

Source code in src/jobify/jobify.py
async def startup(self, app: Jobify) -> None:
    """Run plugin initialization when the application starts."""
    ...

RequestState

Bases: State

An object that can be used to store state specific to a request.

Source code in src/jobify/_internal/common/datastructures.py
class RequestState(State):
    """An object that can be used to store state specific to a request."""

RunMode

Bases: str, Enum

The execution mode of a job.

Attributes:

Name Type Description
MAIN

Execute in the main event loop.

THREAD

Execute in a worker thread.

PROCESS

Execute in a worker process.

Source code in src/jobify/_internal/common/constants.py
@unique
class RunMode(str, Enum):
    """The execution mode of a job.

    Attributes:
        MAIN: Execute in the main event loop.
        THREAD: Execute in a worker thread.
        PROCESS: Execute in a worker process.

    """

    MAIN = "main"
    THREAD = "thread"
    PROCESS = "process"

Runnable

Bases: Generic[ReturnT]

Represents a job function that is ready for execution.

Runnable encapsulates the function, its bound arguments, and the execution strategy (async, thread pool, or process pool).

Attributes:

Name Type Description
name str

The name of the job.

strategy RunStrategy[..., ReturnT]

The execution strategy used to run the job.

bound BoundArguments

The bound arguments of the job function.

origin_arguments

The original arguments before any modifications.

func_spec

Inspection details of the job function.

Source code in src/jobify/_internal/runners.py
class Runnable(Generic[ReturnT]):
    """Represents a job function that is ready for execution.

    `Runnable` encapsulates the function, its bound arguments, and the
    execution strategy (async, thread pool, or process pool).

    Attributes:
        name: The name of the job.
        strategy: The execution strategy used to run the job.
        bound: The bound arguments of the job function.
        origin_arguments: The original arguments before any modifications.
        func_spec: Inspection details of the job function.

    """

    __slots__: tuple[str, ...] = (
        "bound",
        "func_spec",
        "name",
        "origin_arguments",
        "strategy",
    )

    def __init__(
        self,
        *,
        name: str,
        bound: inspect.BoundArguments,
        strategy: RunStrategy[..., ReturnT],
        func_spec: FuncSpec[ReturnT],
    ) -> None:
        self.name: str = name
        self.strategy: RunStrategy[..., ReturnT] = strategy
        self.bound: inspect.BoundArguments = bound
        self.origin_arguments = bound.arguments.copy()
        self.func_spec = func_spec

    def __call__(self) -> Awaitable[ReturnT]:
        """Execute the job using the assigned strategy."""
        return self.strategy(*self.bound.args, **self.bound.kwargs)

__call__()

Execute the job using the assigned strategy.

Source code in src/jobify/_internal/runners.py
def __call__(self) -> Awaitable[ReturnT]:
    """Execute the job using the assigned strategy."""
    return self.strategy(*self.bound.args, **self.bound.kwargs)

ScheduleBuilder

Bases: Generic[ReturnT]

Builder class for scheduling jobs.

This class provides methods to schedule tasks to run immediately (push), at a specific time (at), or according to a cron schedule (cron).

Attributes:

Name Type Description
func_spec FuncSpec[ReturnT]

Function specification.

name str

Name of the scheduled task.

route_options RouteOptions

Route options for the task.

Source code in src/jobify/_internal/scheduler/scheduler.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
class ScheduleBuilder(Generic[ReturnT]):
    """Builder class for scheduling jobs.

    This class provides methods to schedule tasks to run immediately (push),
    at a specific time (at), or according to a cron schedule (cron).

    Attributes:
        func_spec: Function specification.
        name: Name of the scheduled task.
        route_options: Route options for the task.

    """

    __slots__: tuple[str, ...] = (
        "_chain_middleware",
        "_chain_outer_middleware",
        "_configs",
        "_exception_handlers",
        "_is_persist",
        "_runnable",
        "_state",
        "_task_tracker",
        "func_spec",
        "name",
        "route_options",
    )

    def __init__(  # noqa: PLR0913
        self,
        *,
        name: str,
        state: State,
        task_tracker: TaskTracker,
        jobify_config: JobifyConfiguration,
        runnable: Runnable[ReturnT],
        chain_middleware: CallNext,
        chain_outer_middleware: CallNextOuter,
        func_spec: FuncSpec[ReturnT],
        exception_handlers: ExceptionHandlers,
        options: RouteOptions,
        is_persist: bool,
    ) -> None:
        """Initialize the ScheduleBuilder.

        Args:
            name: The task name.
            state: Application state.
            task_tracker: Tracker for scheduled tasks.
            jobify_config: Jobify configuration.
            runnable: The runnable task.
            chain_middleware: Middleware for task execution.
            chain_outer_middleware: Middleware for scheduling.
            func_spec: Function specification.
            exception_handlers: Exception handlers.
            options: Route options.
            is_persist: Whether the schedule should be persisted.

        """
        self._state: State = state
        self._task_tracker: TaskTracker = task_tracker
        self._configs: JobifyConfiguration = jobify_config
        self._runnable: Runnable[ReturnT] = runnable
        self._chain_middleware: CallNext = chain_middleware
        self._chain_outer_middleware: CallNextOuter = chain_outer_middleware
        self._exception_handlers: ExceptionHandlers = exception_handlers
        self._is_persist: bool = is_persist
        self.func_spec: FuncSpec[ReturnT] = func_spec
        self.route_options: RouteOptions = options
        self.name: str = name

    def now(self) -> datetime:
        """Return the current datetime in the configured timezone.

        Returns:
            Current datetime.

        """
        return datetime.now(tz=self._configs.tz)

    async def cron(
        self,
        cron: str | Cron,
        *,
        job_id: str,
        replace: bool = False,
        force: bool = False,
    ) -> Job[ReturnT]:
        """Schedules a task based on a cron expression.

        Args:
            cron: Cron expression string or object.
            job_id: Unique identifier for the job.
            replace: Whether to replace an existing job if it exists.
            force: Whether to force scheduling even if parameters match.

        Returns:
            The scheduled Job instance.

        """
        job_id, exists_job = self._ensure_job_id(job_id, replace=replace)
        if isinstance(cron, str):
            cron = Cron(cron)

        real_now = self.now()

        if exists_job is not None:
            job = exists_job
            ctx = cast("CronContext[ReturnT]", exists_job._cron_context)

            if not force and cron == ctx.cron:
                logger.warning(JOB_ALREADY_EXISTS.format(job_id=job_id, schedule=cron))
                return exists_job

            old_cron = ctx.cron
            current_offset = ctx.offset
        else:
            job = Job[ReturnT](
                exec_at=real_now,
                job_id=job_id,
                unregister_hook=self._task_tracker.unregister_job,
                storage=self._configs.storage,
            )
            ctx = None
            old_cron = None
            current_offset = real_now

        parser = self._configs.cron_factory(cron.expression)
        offset, next_run_at = self._calculate_schedule_cron(
            old_cron=old_cron,
            new_cron=cron,
            parser=parser,
            offset=current_offset,
            real_now=real_now,
        )
        job.exec_at = next_run_at
        if ctx is not None:
            ctx.cron = cron
            ctx.offset = offset
            ctx.run_count = 0
            ctx.cron_parser = parser
        else:
            ctx = CronContext(
                job=job,
                cron=cron,
                offset=offset,
                run_count=0,
                cron_parser=parser,
            )
            job.bind_cron_context(ctx)
        await self._chain_outer_middleware(
            self._create_outer_context(
                job=job,
                trigger=CronArguments(cron=cron, job_id=job_id, offset=offset),
                is_force=force,
                is_replace=replace,
                schedule_hook=lambda: self._schedule_execution_cron(ctx),
            )
        )
        return job

    async def delay(
        self,
        seconds: float,
        *,
        job_id: str | None = None,
        now: datetime | None = None,
        replace: bool = False,
    ) -> Job[ReturnT]:
        """Schedules a task to run after a delay.

        Args:
            seconds: Delay in seconds.
            job_id: Unique identifier for the job.
            now: Current time base.
            replace: Whether to replace an existing job.

        Returns:
            The scheduled Job instance.

        """
        now = now or self.now()
        at = now + timedelta(seconds=seconds)
        return await self.at(at=at, job_id=job_id, replace=replace)

    async def at(
        self,
        at: datetime,
        *,
        job_id: str | None = None,
        replace: bool = False,
        force: bool = False,
    ) -> Job[ReturnT]:
        """Schedules a task to run at a specific time.

        Args:
            at: The scheduled datetime.
            job_id: Unique identifier for the job.
            replace: Whether to replace an existing job.
            force: Whether to force scheduling even if already scheduled at this time.

        Returns:
            The scheduled Job instance.

        """
        job_id, exists_job = self._ensure_job_id(job_id, replace=replace)
        if exists_job is not None:
            if not force and exists_job.exec_at == at:
                logger.info(JOB_ALREADY_EXISTS.format(job_id=job_id, schedule=at))
                return exists_job
            exists_job.exec_at = at
            job = exists_job
        else:
            job = Job[ReturnT](
                exec_at=at,
                job_id=job_id,
                storage=self._configs.storage,
                unregister_hook=self._task_tracker.unregister_job,
            )
        await self._chain_outer_middleware(
            self._create_outer_context(
                job=job,
                trigger=AtArguments(job_id=job_id, at=at),
                is_force=force,
                is_replace=replace,
                schedule_hook=lambda: self._schedule_execution_at(job),
            )
        )
        return job

    async def push(self) -> Job[ReturnT]:
        """Schedules a task to run immediately.

        Returns:
            The scheduled Job instance.

        """
        job = Job[ReturnT](
            exec_at=self.now(),
            job_id=self._configs.uuid_generator().hex,
            storage=self._configs.storage,
            unregister_hook=self._task_tracker.unregister_job,
        )
        await self._chain_outer_middleware(
            self._create_outer_context(
                job=job,
                trigger=PushArguments(job_id=job.id),
                is_force=False,
                is_replace=False,
                schedule_hook=lambda: self._push_execution(job),
            )
        )
        return job

    def _schedule_execution_cron(
        self,
        ctx: CronContext[ReturnT],
    ) -> asyncio.Handle:
        delay_seconds = self._calculate_delay_seconds(ctx.job.exec_at)
        loop = self._configs.getloop()
        when = loop.time() + delay_seconds
        handle = loop.call_at(when, self._pre_exec_cron, ctx)
        ctx.job.bind_handle(handle)
        return handle

    def _schedule_execution_at(self, job: Job[ReturnT]) -> asyncio.Handle:
        loop = self._configs.getloop()
        delay_seconds = self._calculate_delay_seconds(target_at=job.exec_at)
        if delay_seconds <= 0:
            handle = loop.call_soon(self._pre_exec_at, job)
        else:
            when = loop.time() + delay_seconds
            handle = loop.call_at(when, self._pre_exec_at, job)
        job.bind_handle(handle)
        return handle

    def _push_execution(self, job: Job[ReturnT]) -> asyncio.Handle:
        handle = self._configs.getloop().call_soon(self._pre_exec_at, job)
        job.bind_handle(handle)
        return handle

    async def _exec_job(self, job: Job[ReturnT]) -> None:
        job.status = JobStatus.RUNNING
        job_context = JobContext(
            job=job,
            state=self._state,
            request_state=RequestState(),
            runnable=self._runnable,
            route_options=self.route_options,
            jobify_config=self._configs,
            schedule_builder=self,
        )
        try:
            result = await self._chain_middleware(job_context)
        except JobTimeoutError as exc:
            job.set_exception(exc, status=JobStatus.TIMEOUT)
        except Exception as exc:
            logger.exception("Job %s failed with unexpected error", job.id)
            job.set_exception(exc, status=JobStatus.FAILED)
        else:
            job.set_result(result, status=JobStatus.SUCCESS)

    def _calculate_delay_seconds(self, target_at: datetime) -> float:
        return target_at.timestamp() - self.now().timestamp()

    async def _persist_job(
        self,
        job_id: str,
        next_run_at: datetime,
        trigger: Triggers,
    ) -> None:
        await self._configs.storage.add_schedule(
            self._create_scheduled_job(job_id, next_run_at, trigger)
        )

    def _create_scheduled_job(
        self,
        job_id: str,
        next_run_at: datetime,
        trigger: Triggers,
    ) -> ScheduledJob:
        dumper = self._configs.dumper.dump
        params_type = self.func_spec.type_params
        args_dumped = {
            name: dumper(arg, params_type[name])
            for name, arg in self._runnable.origin_arguments.items()
        }
        msg = Message(
            job_id=job_id,
            name=self.name,
            arguments=args_dumped,
            trigger=trigger,
        )
        serializer = self._configs.serializer.dumpb
        serialized_message = serializer(dumper(msg, Message))
        return ScheduledJob(
            name=self.name,
            job_id=job_id,
            message=serialized_message,
            status=JobStatus.SCHEDULED,
            next_run_at=next_run_at,
        )

    def _ensure_job_id(
        self,
        job_id: str | None,
        *,
        replace: bool,
    ) -> tuple[str, Job[ReturnT] | None]:
        job_id = job_id or self._configs.uuid_generator().hex
        if job := self._task_tracker.pending_jobs.get(job_id):
            if replace is True:
                return (job_id, job)

            raise DuplicateJobError(job_id)

        return (job_id, None)

    def _create_outer_context(
        self,
        *,
        job: Job[ReturnT],
        trigger: Triggers,
        is_force: bool,
        is_replace: bool,
        schedule_hook: Callable[[], asyncio.Handle],
    ) -> OuterContext:
        return OuterContext(
            job=job,
            state=self._state,
            trigger=trigger,
            runnable=self._runnable,
            arguments=self._runnable.bound.arguments,
            func_spec=self.func_spec,
            is_force=is_force,
            is_persist=self._is_persist,
            is_replace=is_replace,
            route_options=self.route_options,
            jobify_config=self._configs,
            request_state=RequestState(),
            persist_job_hook=self._persist_job,
            schedule_hook=schedule_hook,
            schedule_builder=self,
        )

    def _calculate_schedule_cron(
        self,
        *,
        old_cron: Cron | None,
        new_cron: Cron,
        parser: CronParser,
        offset: datetime,
        real_now: datetime,
    ) -> tuple[datetime, datetime]:
        start_date_changed = old_cron is None or (
            old_cron.start_date != new_cron.start_date
        )
        if start_date_changed and new_cron.start_date is not None:
            target_at = new_cron.start_date
            offset = target_at
        else:
            target_at = parser.next_run(now=offset)

        next_run_at = handle_misfire_policy(
            cron_parser=parser,
            next_run_at=target_at,
            real_now=real_now,
            policy=new_cron.misfire_policy,
        )
        return (offset, next_run_at)

    def _pre_exec_at(self, job: Job[ReturnT]) -> None:
        task = asyncio.create_task(self._exec_at(job), name=job.id)
        self._task_tracker.track_task(job.id, task, job._event)

    async def _exec_at(self, job: Job[ReturnT]) -> None:
        await self._exec_job(job)
        if self._is_persist:
            await self._configs.storage.delete_schedule(job.id)
        self._task_tracker.unregister_job(job.id)

    def _pre_exec_cron(self, ctx: CronContext[ReturnT]) -> None:
        task = asyncio.create_task(self._exec_cron(ctx=ctx), name=ctx.job.id)
        self._task_tracker.track_task(ctx.job.id, task, ctx.job._event)

    async def _exec_cron(self, ctx: CronContext[ReturnT]) -> None:
        job = ctx.job
        await self._exec_job(job)

        if job.status is JobStatus.SUCCESS:
            ctx.failure_count = 0
        else:
            ctx.failure_count += 1
        ctx.run_count += 1

        if ctx.is_run_exceeded_by_limit():
            if self._is_persist:
                await self._configs.storage.delete_schedule(job.id)
        elif job.is_reschedulable() and self._configs.app_started:
            if ctx.is_failure_allowed_by_limit():
                offset = ctx.offset = job.exec_at
                next_run_at = ctx.cron_parser.next_run(now=offset)

                if self._is_persist:
                    trigger = CronArguments(
                        job_id=job.id,
                        cron=ctx.cron,
                        offset=offset,
                        run_count=ctx.run_count,
                    )
                    await self._persist_job(job.id, next_run_at, trigger)
                job.update(exec_at=next_run_at, status=JobStatus.SCHEDULED)
                self._schedule_execution_cron(ctx)
                return

            job.status = JobStatus.PERMANENTLY_FAILED
            logger.info(
                "Job %s stopped due to max failures policy (%s/%s)",
                job.id,
                ctx.failure_count,
                ctx.cron.max_failures,
            )
        self._task_tracker.unregister_job(job.id)

    def _cron(  # noqa: PLR0913
        self,
        *,
        cron: Cron,
        job_id: str,
        offset: datetime,
        next_run_at: datetime,
        cron_parser: CronParser,
        run_count: int,
    ) -> None:
        job = Job[ReturnT](
            exec_at=next_run_at,
            job_id=job_id,
            unregister_hook=self._task_tracker.unregister_job,
            storage=self._configs.storage,
        )
        ctx = CronContext(
            job=job,
            cron=cron,
            offset=offset,
            cron_parser=cron_parser,
            run_count=run_count,
        )
        job.bind_cron_context(ctx)
        self._task_tracker.register_job(job)
        self._schedule_execution_cron(ctx)

    def _at(self, at: datetime, job_id: str) -> None:
        job = Job[ReturnT](
            exec_at=at,
            job_id=job_id,
            unregister_hook=self._task_tracker.unregister_job,
            storage=self._configs.storage,
        )
        self._task_tracker.register_job(job)
        self._schedule_execution_at(job)

    def _push(self, job_id: str, exec_at: datetime) -> None:
        job = Job[ReturnT](
            job_id=job_id,
            exec_at=exec_at,
            storage=self._configs.storage,
            unregister_hook=self._task_tracker.unregister_job,
        )
        self._task_tracker.register_job(job)
        handle = self._configs.getloop().call_soon(self._pre_exec_at, job)
        job.bind_handle(handle)

__init__(*, name, state, task_tracker, jobify_config, runnable, chain_middleware, chain_outer_middleware, func_spec, exception_handlers, options, is_persist)

Initialize the ScheduleBuilder.

Parameters:

Name Type Description Default
name str

The task name.

required
state State

Application state.

required
task_tracker TaskTracker

Tracker for scheduled tasks.

required
jobify_config JobifyConfiguration

Jobify configuration.

required
runnable Runnable[ReturnT]

The runnable task.

required
chain_middleware CallNext

Middleware for task execution.

required
chain_outer_middleware CallNextOuter

Middleware for scheduling.

required
func_spec FuncSpec[ReturnT]

Function specification.

required
exception_handlers ExceptionHandlers

Exception handlers.

required
options RouteOptions

Route options.

required
is_persist bool

Whether the schedule should be persisted.

required
Source code in src/jobify/_internal/scheduler/scheduler.py
def __init__(  # noqa: PLR0913
    self,
    *,
    name: str,
    state: State,
    task_tracker: TaskTracker,
    jobify_config: JobifyConfiguration,
    runnable: Runnable[ReturnT],
    chain_middleware: CallNext,
    chain_outer_middleware: CallNextOuter,
    func_spec: FuncSpec[ReturnT],
    exception_handlers: ExceptionHandlers,
    options: RouteOptions,
    is_persist: bool,
) -> None:
    """Initialize the ScheduleBuilder.

    Args:
        name: The task name.
        state: Application state.
        task_tracker: Tracker for scheduled tasks.
        jobify_config: Jobify configuration.
        runnable: The runnable task.
        chain_middleware: Middleware for task execution.
        chain_outer_middleware: Middleware for scheduling.
        func_spec: Function specification.
        exception_handlers: Exception handlers.
        options: Route options.
        is_persist: Whether the schedule should be persisted.

    """
    self._state: State = state
    self._task_tracker: TaskTracker = task_tracker
    self._configs: JobifyConfiguration = jobify_config
    self._runnable: Runnable[ReturnT] = runnable
    self._chain_middleware: CallNext = chain_middleware
    self._chain_outer_middleware: CallNextOuter = chain_outer_middleware
    self._exception_handlers: ExceptionHandlers = exception_handlers
    self._is_persist: bool = is_persist
    self.func_spec: FuncSpec[ReturnT] = func_spec
    self.route_options: RouteOptions = options
    self.name: str = name

at(at, *, job_id=None, replace=False, force=False) async

Schedules a task to run at a specific time.

Parameters:

Name Type Description Default
at datetime

The scheduled datetime.

required
job_id str | None

Unique identifier for the job.

None
replace bool

Whether to replace an existing job.

False
force bool

Whether to force scheduling even if already scheduled at this time.

False

Returns:

Type Description
Job[ReturnT]

The scheduled Job instance.

Source code in src/jobify/_internal/scheduler/scheduler.py
async def at(
    self,
    at: datetime,
    *,
    job_id: str | None = None,
    replace: bool = False,
    force: bool = False,
) -> Job[ReturnT]:
    """Schedules a task to run at a specific time.

    Args:
        at: The scheduled datetime.
        job_id: Unique identifier for the job.
        replace: Whether to replace an existing job.
        force: Whether to force scheduling even if already scheduled at this time.

    Returns:
        The scheduled Job instance.

    """
    job_id, exists_job = self._ensure_job_id(job_id, replace=replace)
    if exists_job is not None:
        if not force and exists_job.exec_at == at:
            logger.info(JOB_ALREADY_EXISTS.format(job_id=job_id, schedule=at))
            return exists_job
        exists_job.exec_at = at
        job = exists_job
    else:
        job = Job[ReturnT](
            exec_at=at,
            job_id=job_id,
            storage=self._configs.storage,
            unregister_hook=self._task_tracker.unregister_job,
        )
    await self._chain_outer_middleware(
        self._create_outer_context(
            job=job,
            trigger=AtArguments(job_id=job_id, at=at),
            is_force=force,
            is_replace=replace,
            schedule_hook=lambda: self._schedule_execution_at(job),
        )
    )
    return job

cron(cron, *, job_id, replace=False, force=False) async

Schedules a task based on a cron expression.

Parameters:

Name Type Description Default
cron str | Cron

Cron expression string or object.

required
job_id str

Unique identifier for the job.

required
replace bool

Whether to replace an existing job if it exists.

False
force bool

Whether to force scheduling even if parameters match.

False

Returns:

Type Description
Job[ReturnT]

The scheduled Job instance.

Source code in src/jobify/_internal/scheduler/scheduler.py
async def cron(
    self,
    cron: str | Cron,
    *,
    job_id: str,
    replace: bool = False,
    force: bool = False,
) -> Job[ReturnT]:
    """Schedules a task based on a cron expression.

    Args:
        cron: Cron expression string or object.
        job_id: Unique identifier for the job.
        replace: Whether to replace an existing job if it exists.
        force: Whether to force scheduling even if parameters match.

    Returns:
        The scheduled Job instance.

    """
    job_id, exists_job = self._ensure_job_id(job_id, replace=replace)
    if isinstance(cron, str):
        cron = Cron(cron)

    real_now = self.now()

    if exists_job is not None:
        job = exists_job
        ctx = cast("CronContext[ReturnT]", exists_job._cron_context)

        if not force and cron == ctx.cron:
            logger.warning(JOB_ALREADY_EXISTS.format(job_id=job_id, schedule=cron))
            return exists_job

        old_cron = ctx.cron
        current_offset = ctx.offset
    else:
        job = Job[ReturnT](
            exec_at=real_now,
            job_id=job_id,
            unregister_hook=self._task_tracker.unregister_job,
            storage=self._configs.storage,
        )
        ctx = None
        old_cron = None
        current_offset = real_now

    parser = self._configs.cron_factory(cron.expression)
    offset, next_run_at = self._calculate_schedule_cron(
        old_cron=old_cron,
        new_cron=cron,
        parser=parser,
        offset=current_offset,
        real_now=real_now,
    )
    job.exec_at = next_run_at
    if ctx is not None:
        ctx.cron = cron
        ctx.offset = offset
        ctx.run_count = 0
        ctx.cron_parser = parser
    else:
        ctx = CronContext(
            job=job,
            cron=cron,
            offset=offset,
            run_count=0,
            cron_parser=parser,
        )
        job.bind_cron_context(ctx)
    await self._chain_outer_middleware(
        self._create_outer_context(
            job=job,
            trigger=CronArguments(cron=cron, job_id=job_id, offset=offset),
            is_force=force,
            is_replace=replace,
            schedule_hook=lambda: self._schedule_execution_cron(ctx),
        )
    )
    return job

delay(seconds, *, job_id=None, now=None, replace=False) async

Schedules a task to run after a delay.

Parameters:

Name Type Description Default
seconds float

Delay in seconds.

required
job_id str | None

Unique identifier for the job.

None
now datetime | None

Current time base.

None
replace bool

Whether to replace an existing job.

False

Returns:

Type Description
Job[ReturnT]

The scheduled Job instance.

Source code in src/jobify/_internal/scheduler/scheduler.py
async def delay(
    self,
    seconds: float,
    *,
    job_id: str | None = None,
    now: datetime | None = None,
    replace: bool = False,
) -> Job[ReturnT]:
    """Schedules a task to run after a delay.

    Args:
        seconds: Delay in seconds.
        job_id: Unique identifier for the job.
        now: Current time base.
        replace: Whether to replace an existing job.

    Returns:
        The scheduled Job instance.

    """
    now = now or self.now()
    at = now + timedelta(seconds=seconds)
    return await self.at(at=at, job_id=job_id, replace=replace)

now()

Return the current datetime in the configured timezone.

Returns:

Type Description
datetime

Current datetime.

Source code in src/jobify/_internal/scheduler/scheduler.py
def now(self) -> datetime:
    """Return the current datetime in the configured timezone.

    Returns:
        Current datetime.

    """
    return datetime.now(tz=self._configs.tz)

push() async

Schedules a task to run immediately.

Returns:

Type Description
Job[ReturnT]

The scheduled Job instance.

Source code in src/jobify/_internal/scheduler/scheduler.py
async def push(self) -> Job[ReturnT]:
    """Schedules a task to run immediately.

    Returns:
        The scheduled Job instance.

    """
    job = Job[ReturnT](
        exec_at=self.now(),
        job_id=self._configs.uuid_generator().hex,
        storage=self._configs.storage,
        unregister_hook=self._task_tracker.unregister_job,
    )
    await self._chain_outer_middleware(
        self._create_outer_context(
            job=job,
            trigger=PushArguments(job_id=job.id),
            is_force=False,
            is_replace=False,
            schedule_hook=lambda: self._push_execution(job),
        )
    )
    return job

SmartRetry

Bases: NamedTuple

Immutable configuration and delay logic for retrying failed operations.

Uses exponential backoff with optional equal jitter to spread retry load. Designed as a value object: cheap to copy, safe to share across threads.

Attributes:

Name Type Description
retries int

Number of retries that should be performed after the first failure. Must be >= 0. A value of 0 means no retries.

initial_delay float

Base delay in seconds before the first retry.

max_delay float

Upper bound on computed delay, regardless of backoff growth.

backoff_factor float

Multiplier applied per retry. 1.0 gives constant delay, 2.0 gives classic exponential backoff.

jitter bool

If True, randomises delay in [delay/2, delay] to avoid thundering-herd. If False, delay is deterministic.

include_exceptions tuple[type[Exception], ...]

Exception types that trigger a retry. Defaults to ``(Exception,) — retries on anything.

exclude_exceptions tuple[type[Exception], ...]

Exception types that are re-raised immediately, even if they match include_exceptions. Takes priority.

Source code in src/jobify/_internal/configuration.py
class SmartRetry(NamedTuple):
    """Immutable configuration and delay logic for retrying failed operations.

    Uses exponential backoff with optional equal jitter to spread retry load.
    Designed as a value object: cheap to copy, safe to share across threads.

    Attributes:
        retries: Number of retries that should be performed after the first failure.
            Must be >= 0. A value of 0 means no retries.
        initial_delay: Base delay in seconds before the first retry.
        max_delay: Upper bound on computed delay, regardless of backoff growth.
        backoff_factor: Multiplier applied per retry. ``1.0`` gives constant
            delay, ``2.0`` gives classic exponential backoff.
        jitter: If ``True``, randomises delay in ``[delay/2, delay]`` to avoid
            thundering-herd. If ``False``, delay is deterministic.
        include_exceptions: Exception types that trigger a retry.
            Defaults to ``(Exception,) — retries on anything.
        exclude_exceptions: Exception types that are re-raised immediately,
            even if they match ``include_exceptions``. Takes priority.

    """

    retries: int
    initial_delay: float = 0.5
    max_delay: float = 60.0
    backoff_factor: float = 2.0
    jitter: bool = True
    include_exceptions: tuple[type[Exception], ...] = (Exception,)
    exclude_exceptions: tuple[type[Exception], ...] = ()

    def compute_delay(self, attempt: int) -> float:
        delay = min(
            self.initial_delay * self.backoff_factor ** (attempt - 1),
            self.max_delay,
        )
        if self.jitter:
            delay /= 2
            return delay + random.uniform(0, delay)  # nosec # noqa: S311
        return delay

State

Bases: UserDict[str, Any]

An object that can be used to store arbitrary state.

This class provides dictionary-like access to state data, allowing for both key-based and attribute-based access.

Parameters:

Name Type Description Default
state dict[str, Any] | None

Initial state dictionary.

None
Source code in src/jobify/_internal/common/datastructures.py
class State(UserDict[str, Any]):
    """An object that can be used to store arbitrary state.

    This class provides dictionary-like access to state data, allowing for both
    key-based and attribute-based access.

    Args:
        state: Initial state dictionary.

    """

    data: dict[str, Any]
    __slots__: tuple[str] = ("data",)

    def __init__(self, state: dict[str, Any] | None = None) -> None:  # pyright: ignore[reportMissingSuperCall]
        object.__setattr__(self, "data", state or {})

    @override
    def __setattr__(self, key: str, value: Any) -> None:
        self[key] = value

    def __getattr__(self, key: str) -> Any:
        try:
            return self.data[key]
        except KeyError as exc:
            message = f"{self.__class__.__name__!r} object has no attribute {key!r}"
            raise AttributeError(message) from exc

    @override
    def __delattr__(self, key: str) -> None:
        del self[key]

    @override
    def __str__(self) -> str:
        cls_name = type(self).__name__
        return f"{cls_name}({super().__str__()})"

jobify.serializers

Serializers module for Jobify.

This module provides serialization utilities for task data and results. Available serializers implement the Serializer protocol.

Classes:

Name Description
Serializer

Abstract base protocol defining serializer interface

JSONSerializer

Standard JSON serializer

ExtendedJSONSerializer

JSON serializer supporting extended types

CBORSerializer

CBOR binary format serializer

MsgpackSerializer

Msgpack binary format serializer

OrjsonSerializer

High-performance JSON serializer (using orjson)

UnsafePickleSerializer

Pickle-based serializer (use with caution)

Protocol Interface

dumpb(value: Any) -> bytes: Serialize object to bytes loadb(value: bytes) -> Any: Deserialize bytes to object

Security Notes
  • JSON, CBOR, Msgpack, Orjson: Generally safe for data exchange.
  • UnsafePickleSerializer: UNSAFE for untrusted data - allows arbitrary code execution.

Serializer

Bases: Protocol

Interface for serializing and deserializing job messages.

Source code in src/jobify/_internal/serializers/base.py
class Serializer(Protocol, metaclass=ABCMeta):
    """Interface for serializing and deserializing job messages."""

    @abstractmethod
    def dumpb(self, data: Any) -> bytes:  # noqa: ANN401
        """Serialize data to bytes."""
        raise NotImplementedError

    @abstractmethod
    def loadb(self, data: bytes) -> Any:  # noqa: ANN401
        """Deserialize data from bytes."""
        raise NotImplementedError

dumpb(data) abstractmethod

Serialize data to bytes.

Source code in src/jobify/_internal/serializers/base.py
@abstractmethod
def dumpb(self, data: Any) -> bytes:  # noqa: ANN401
    """Serialize data to bytes."""
    raise NotImplementedError

loadb(data) abstractmethod

Deserialize data from bytes.

Source code in src/jobify/_internal/serializers/base.py
@abstractmethod
def loadb(self, data: bytes) -> Any:  # noqa: ANN401
    """Deserialize data from bytes."""
    raise NotImplementedError

CBORSerializer

Bases: Serializer

Serialize and deserialize data using the cbor2 library.

See https://cbor2.readthedocs.io/en/stable/ for more information.

Source code in src/jobify/serializers/cbor.py
class CBORSerializer(Serializer):
    """Serialize and deserialize data using the cbor2 library.

    See https://cbor2.readthedocs.io/en/stable/ for more information.
    """

    def __init__(  # noqa: PLR0913
        self,
        *,
        # Encoder options
        datetime_as_timestamp: bool = False,
        timezone: datetime.tzinfo | None = None,
        value_sharing: bool = False,
        encoders: Mapping[type, "cbor2.EncoderHook"] | None = None,
        default: "cbor2.EncoderHook | None" = None,
        canonical: bool = False,
        date_as_datetime: bool = False,
        string_referencing: bool = False,
        indefinite_containers: bool = False,
        # Decoder options
        tag_hook: "cbor2.TagHook | None" = None,
        object_hook: "cbor2.ObjectHook | None" = None,
        semantic_decoders: SemanticDecoders = None,
        str_errors: str = "strict",
        max_depth: int = 400,
        allow_indefinite: bool = True,
        immutable: bool = False,
    ) -> None:
        """Initialize the CBOR serializer with encoder and decoder options.

        Args:
            datetime_as_timestamp: Serialize datetimes as UNIX timestamps.
                Makes datetimes more concise on the wire, but loses timezone info.
            timezone: Default timezone for naive datetimes. If not set, naive
                datetimes raise ``ValueError`` during encoding.
            value_sharing: Allow efficient serialization of repeated values and
                cyclic data structures, at the cost of extra overhead.
            encoders: Mapping of Python types to encoder hooks, overriding the
                default encoding for those types.
            default: Fallback encoder hook called when no suitable encoder is found
                for a value.
            canonical: Use canonical CBOR representation (e.g. sorted maps/sets),
                ensuring serializations are comparable without decoding.
            date_as_datetime: Serialize ``date`` objects as datetimes (CBOR tag 0).
                This was the default behavior in cbor2 <= 4.1.2.
            string_referencing: Allow more efficient serialization of repeated
                string values.
            indefinite_containers: Encode containers as indefinite-length using a
                stop code instead of an explicit length prefix.
            tag_hook: Decoder hook for CBOR tags with no built-in decoder. Called
                with the ``CBORTag``; its return value replaces the tag in output.
            object_hook: Decoder hook called for each deserialized ``dict``. Its
                return value replaces the dict in output.
            semantic_decoders: Mapping of semantic tag numbers to decoder callbacks,
                overriding the default decoding for those tags.
            str_errors: Unicode error handler for string decoding (e.g.
                ``"strict"``, ``"replace"``, ``"ignore"``).
            max_depth: Maximum allowed nesting depth for containers.
            allow_indefinite: If ``False``, raise ``CBORDecodeError`` on
                indefinite-length strings or containers in the input.
            immutable: Return immutable types (``tuple``, ``frozenset``) instead of
                mutable ones (``list``, ``set``).

        Raises:
            ImportError: If cbor2 is not installed.

        """
        if cbor2 is UNSET:  # pragma: no cover
            msg = "cbor2 is required: `uv add jobify[cbor2]`"
            raise ImportError(msg)

        self.datetime_as_timestamp = datetime_as_timestamp
        self.timezone = timezone
        self.value_sharing = value_sharing
        self.encoders = encoders
        self.default = default
        self.canonical = canonical
        self.date_as_datetime = date_as_datetime
        self.string_referencing = string_referencing
        self.indefinite_containers = indefinite_containers
        self.tag_hook = tag_hook
        self.object_hook = object_hook
        self.semantic_decoders = semantic_decoders
        self.str_errors = str_errors
        self.max_depth = max_depth
        self.allow_indefinite = allow_indefinite
        self.immutable = immutable

    @override
    def dumpb(self, data: JSONCompat) -> bytes:
        """Serialize data to CBOR bytes.

        Args:
            data: The value to serialize.

        Returns:
            CBOR-encoded bytes.

        """
        return cbor2.dumps(
            data,
            datetime_as_timestamp=self.datetime_as_timestamp,
            timezone=self.timezone,
            value_sharing=self.value_sharing,
            encoders=self.encoders,
            default=self.default,
            canonical=self.canonical,
            date_as_datetime=self.date_as_datetime,
            string_referencing=self.string_referencing,
            indefinite_containers=self.indefinite_containers,
        )

    @override
    def loadb(self, data: bytes) -> JSONCompat:
        """Deserialize CBOR bytes to a Python object.

        Args:
            data: CBOR-encoded bytes to deserialize.

        Returns:
            The deserialized Python object.

        """
        r: JSONCompat = cbor2.loads(
            data,
            tag_hook=self.tag_hook,
            object_hook=self.object_hook,
            semantic_decoders=self.semantic_decoders,
            str_errors=self.str_errors,
            max_depth=self.max_depth,
            allow_indefinite=self.allow_indefinite,
            immutable=self.immutable,
        )
        return r

__init__(*, datetime_as_timestamp=False, timezone=None, value_sharing=False, encoders=None, default=None, canonical=False, date_as_datetime=False, string_referencing=False, indefinite_containers=False, tag_hook=None, object_hook=None, semantic_decoders=None, str_errors='strict', max_depth=400, allow_indefinite=True, immutable=False)

Initialize the CBOR serializer with encoder and decoder options.

Parameters:

Name Type Description Default
datetime_as_timestamp bool

Serialize datetimes as UNIX timestamps. Makes datetimes more concise on the wire, but loses timezone info.

False
timezone tzinfo | None

Default timezone for naive datetimes. If not set, naive datetimes raise ValueError during encoding.

None
value_sharing bool

Allow efficient serialization of repeated values and cyclic data structures, at the cost of extra overhead.

False
encoders Mapping[type, EncoderHook] | None

Mapping of Python types to encoder hooks, overriding the default encoding for those types.

None
default EncoderHook | None

Fallback encoder hook called when no suitable encoder is found for a value.

None
canonical bool

Use canonical CBOR representation (e.g. sorted maps/sets), ensuring serializations are comparable without decoding.

False
date_as_datetime bool

Serialize date objects as datetimes (CBOR tag 0). This was the default behavior in cbor2 <= 4.1.2.

False
string_referencing bool

Allow more efficient serialization of repeated string values.

False
indefinite_containers bool

Encode containers as indefinite-length using a stop code instead of an explicit length prefix.

False
tag_hook TagHook | None

Decoder hook for CBOR tags with no built-in decoder. Called with the CBORTag; its return value replaces the tag in output.

None
object_hook ObjectHook | None

Decoder hook called for each deserialized dict. Its return value replaces the dict in output.

None
semantic_decoders SemanticDecoders

Mapping of semantic tag numbers to decoder callbacks, overriding the default decoding for those tags.

None
str_errors str

Unicode error handler for string decoding (e.g. "strict", "replace", "ignore").

'strict'
max_depth int

Maximum allowed nesting depth for containers.

400
allow_indefinite bool

If False, raise CBORDecodeError on indefinite-length strings or containers in the input.

True
immutable bool

Return immutable types (tuple, frozenset) instead of mutable ones (list, set).

False

Raises:

Type Description
ImportError

If cbor2 is not installed.

Source code in src/jobify/serializers/cbor.py
def __init__(  # noqa: PLR0913
    self,
    *,
    # Encoder options
    datetime_as_timestamp: bool = False,
    timezone: datetime.tzinfo | None = None,
    value_sharing: bool = False,
    encoders: Mapping[type, "cbor2.EncoderHook"] | None = None,
    default: "cbor2.EncoderHook | None" = None,
    canonical: bool = False,
    date_as_datetime: bool = False,
    string_referencing: bool = False,
    indefinite_containers: bool = False,
    # Decoder options
    tag_hook: "cbor2.TagHook | None" = None,
    object_hook: "cbor2.ObjectHook | None" = None,
    semantic_decoders: SemanticDecoders = None,
    str_errors: str = "strict",
    max_depth: int = 400,
    allow_indefinite: bool = True,
    immutable: bool = False,
) -> None:
    """Initialize the CBOR serializer with encoder and decoder options.

    Args:
        datetime_as_timestamp: Serialize datetimes as UNIX timestamps.
            Makes datetimes more concise on the wire, but loses timezone info.
        timezone: Default timezone for naive datetimes. If not set, naive
            datetimes raise ``ValueError`` during encoding.
        value_sharing: Allow efficient serialization of repeated values and
            cyclic data structures, at the cost of extra overhead.
        encoders: Mapping of Python types to encoder hooks, overriding the
            default encoding for those types.
        default: Fallback encoder hook called when no suitable encoder is found
            for a value.
        canonical: Use canonical CBOR representation (e.g. sorted maps/sets),
            ensuring serializations are comparable without decoding.
        date_as_datetime: Serialize ``date`` objects as datetimes (CBOR tag 0).
            This was the default behavior in cbor2 <= 4.1.2.
        string_referencing: Allow more efficient serialization of repeated
            string values.
        indefinite_containers: Encode containers as indefinite-length using a
            stop code instead of an explicit length prefix.
        tag_hook: Decoder hook for CBOR tags with no built-in decoder. Called
            with the ``CBORTag``; its return value replaces the tag in output.
        object_hook: Decoder hook called for each deserialized ``dict``. Its
            return value replaces the dict in output.
        semantic_decoders: Mapping of semantic tag numbers to decoder callbacks,
            overriding the default decoding for those tags.
        str_errors: Unicode error handler for string decoding (e.g.
            ``"strict"``, ``"replace"``, ``"ignore"``).
        max_depth: Maximum allowed nesting depth for containers.
        allow_indefinite: If ``False``, raise ``CBORDecodeError`` on
            indefinite-length strings or containers in the input.
        immutable: Return immutable types (``tuple``, ``frozenset``) instead of
            mutable ones (``list``, ``set``).

    Raises:
        ImportError: If cbor2 is not installed.

    """
    if cbor2 is UNSET:  # pragma: no cover
        msg = "cbor2 is required: `uv add jobify[cbor2]`"
        raise ImportError(msg)

    self.datetime_as_timestamp = datetime_as_timestamp
    self.timezone = timezone
    self.value_sharing = value_sharing
    self.encoders = encoders
    self.default = default
    self.canonical = canonical
    self.date_as_datetime = date_as_datetime
    self.string_referencing = string_referencing
    self.indefinite_containers = indefinite_containers
    self.tag_hook = tag_hook
    self.object_hook = object_hook
    self.semantic_decoders = semantic_decoders
    self.str_errors = str_errors
    self.max_depth = max_depth
    self.allow_indefinite = allow_indefinite
    self.immutable = immutable

dumpb(data)

Serialize data to CBOR bytes.

Parameters:

Name Type Description Default
data JSONCompat

The value to serialize.

required

Returns:

Type Description
bytes

CBOR-encoded bytes.

Source code in src/jobify/serializers/cbor.py
@override
def dumpb(self, data: JSONCompat) -> bytes:
    """Serialize data to CBOR bytes.

    Args:
        data: The value to serialize.

    Returns:
        CBOR-encoded bytes.

    """
    return cbor2.dumps(
        data,
        datetime_as_timestamp=self.datetime_as_timestamp,
        timezone=self.timezone,
        value_sharing=self.value_sharing,
        encoders=self.encoders,
        default=self.default,
        canonical=self.canonical,
        date_as_datetime=self.date_as_datetime,
        string_referencing=self.string_referencing,
        indefinite_containers=self.indefinite_containers,
    )

loadb(data)

Deserialize CBOR bytes to a Python object.

Parameters:

Name Type Description Default
data bytes

CBOR-encoded bytes to deserialize.

required

Returns:

Type Description
JSONCompat

The deserialized Python object.

Source code in src/jobify/serializers/cbor.py
@override
def loadb(self, data: bytes) -> JSONCompat:
    """Deserialize CBOR bytes to a Python object.

    Args:
        data: CBOR-encoded bytes to deserialize.

    Returns:
        The deserialized Python object.

    """
    r: JSONCompat = cbor2.loads(
        data,
        tag_hook=self.tag_hook,
        object_hook=self.object_hook,
        semantic_decoders=self.semantic_decoders,
        str_errors=self.str_errors,
        max_depth=self.max_depth,
        allow_indefinite=self.allow_indefinite,
        immutable=self.immutable,
    )
    return r

MsgpackSerializer

Bases: Serializer

Serialize and deserialize data using the msgpack library.

See https://msgpack-python.readthedocs.io for more information.

Source code in src/jobify/serializers/msgpack.py
class MsgpackSerializer(Serializer):
    """Serialize and deserialize data using the msgpack library.

    See https://msgpack-python.readthedocs.io for more information.
    """

    def __init__(  # noqa: PLR0913
        self,
        *,
        # Packer options
        default: Callable[[Any], Any] | None = None,
        use_bin_type: bool = True,
        strict_types: bool = False,
        datetime: bool = False,
        unicode_errors: str | None = None,
        # Unpacker options
        raw: bool = False,
        timestamp: int = 0,
        strict_map_key: bool = True,
        use_list: bool = True,
        object_hook: Callable[[dict[Any, Any]], Any] | None = None,
        object_pairs_hook: Callable[[list[tuple[Any, Any]]], Any] | None = None,
        list_hook: Callable[[list[Any]], Any] | None = None,
        ext_hook: Callable[[int, bytes], Any] | None = None,
        max_str_len: int = -1,
        max_bin_len: int = -1,
        max_array_len: int = -1,
        max_map_len: int = -1,
        max_ext_len: int = -1,
    ) -> None:
        """Initialize the MessagePack serializer with packer and unpacker options.

        Args:
            default: Callable invoked for types not natively supported by
                msgpack. Should return a msgpack-serializable value or raise
                ``TypeError`` if the type cannot be handled.
            use_bin_type: Use the msgpack 2.0 ``bin`` type for ``bytes``
                objects. Also enables ``str8`` type for unicode. Should be
                ``True`` (default) for all modern use; set to ``False`` only
                for compatibility with very old msgpack consumers.
            strict_types: If ``True``, only exact types are serialized;
                subclasses are forwarded to ``default``. Also prevents tuples
                from being serialized as arrays.
            datetime: If ``True``, ``datetime`` objects with ``tzinfo`` are
                packed as the msgpack ``Timestamp`` ext type. The timezone
                offset is stripped; use ``timestamp=3`` in unpacker options
                to recover UTC ``datetime`` on the other end.
            unicode_errors: Error handler for encoding unicode strings
                (e.g. ``"strict"``, ``"replace"``, ``"ignore"``). Avoid
                unless you have a specific reason to handle bad unicode.
            raw: If ``True``, unpack msgpack ``raw`` bytes to Python ``bytes``
                instead of decoding to ``str``. Useful when round-tripping
                data packed with ``use_bin_type=False``.
            timestamp: Controls how msgpack ``Timestamp`` ext type is
                deserialized:

                - ``0`` — return a ``msgpack.Timestamp`` object (default).
                - ``1`` — return a ``float`` (seconds since epoch).
                - ``2`` — return an ``int`` (nanoseconds since epoch).
                - ``3`` — return a UTC-aware ``datetime.datetime``.

            strict_map_key: If ``True`` (default), only ``str`` or ``bytes``
                are accepted as map keys, preventing hash-DoS attacks from
                untrusted input.
            use_list: If ``True`` (default), unpack msgpack arrays to Python
                ``list``. If ``False``, unpack to ``tuple``.
            object_hook: Callable invoked with each deserialized ``dict``.
                Its return value replaces the dict in output.
            object_pairs_hook: Callable invoked with a list of ``(key, value)``
                pairs for each deserialized map. Mutually exclusive with
                ``object_hook``.
            list_hook: Callable invoked with each deserialized ``list``. Its
                return value replaces the list in output.
            ext_hook: Callable invoked for ext types with no built-in decoder.
                Receives ``(code, data)`` and should return a Python object.
                Defaults to returning ``msgpack.ExtType(code, data)``.
            max_str_len: Maximum allowed byte length for ``str`` values.
                ``-1`` means no limit.
            max_bin_len: Maximum allowed byte length for ``bytes`` values.
                ``-1`` means no limit.
            max_array_len: Maximum allowed number of elements in arrays.
                ``-1`` means no limit.
            max_map_len: Maximum allowed number of key-value pairs in maps.
                ``-1`` means no limit.
            max_ext_len: Maximum allowed byte length for ext type data.
                ``-1`` means no limit.

        Raises:
            ImportError: If msgpack is not installed.

        """
        if msgpack is UNSET:  # pragma: no cover
            msg = "msgpack is required: `uv add jobify[msgpack]`"
            raise ImportError(msg)

        self.default = default
        self.use_bin_type = use_bin_type
        self.strict_types = strict_types
        self.datetime = datetime
        self.unicode_errors = unicode_errors
        self.raw = raw
        self.timestamp = timestamp
        self.strict_map_key = strict_map_key
        self.use_list = use_list
        self.object_hook = object_hook
        self.object_pairs_hook = object_pairs_hook
        self.list_hook = list_hook
        self.ext_hook = ext_hook
        self.max_str_len = max_str_len
        self.max_bin_len = max_bin_len
        self.max_array_len = max_array_len
        self.max_map_len = max_map_len
        self.max_ext_len = max_ext_len

    @override
    def dumpb(self, data: JSONCompat) -> bytes:
        """Serialize data to MessagePack bytes.

        Args:
            data: The value to serialize.

        Returns:
            MessagePack-encoded bytes.

        """
        return cast(
            "bytes",
            msgpack.dumps(
                data,
                default=self.default,
                use_bin_type=self.use_bin_type,
                strict_types=self.strict_types,
                datetime=self.datetime,
                unicode_errors=self.unicode_errors,
            ),
        )

    @override
    def loadb(self, data: bytes) -> JSONCompat:
        """Deserialize MessagePack bytes to a Python object.

        Args:
            data: MessagePack-encoded bytes to deserialize.

        Returns:
            The deserialized Python object.

        """
        r: JSONCompat = msgpack.loads(
            data,
            raw=self.raw,
            timestamp=self.timestamp,
            strict_map_key=self.strict_map_key,
            use_list=self.use_list,
            object_hook=self.object_hook,
            object_pairs_hook=self.object_pairs_hook,
            list_hook=self.list_hook,
            ext_hook=self.ext_hook,
            max_str_len=self.max_str_len,
            max_bin_len=self.max_bin_len,
            max_array_len=self.max_array_len,
            max_map_len=self.max_map_len,
            max_ext_len=self.max_ext_len,
        )
        return r

__init__(*, default=None, use_bin_type=True, strict_types=False, datetime=False, unicode_errors=None, raw=False, timestamp=0, strict_map_key=True, use_list=True, object_hook=None, object_pairs_hook=None, list_hook=None, ext_hook=None, max_str_len=-1, max_bin_len=-1, max_array_len=-1, max_map_len=-1, max_ext_len=-1)

Initialize the MessagePack serializer with packer and unpacker options.

Parameters:

Name Type Description Default
default Callable[[Any], Any] | None

Callable invoked for types not natively supported by msgpack. Should return a msgpack-serializable value or raise TypeError if the type cannot be handled.

None
use_bin_type bool

Use the msgpack 2.0 bin type for bytes objects. Also enables str8 type for unicode. Should be True (default) for all modern use; set to False only for compatibility with very old msgpack consumers.

True
strict_types bool

If True, only exact types are serialized; subclasses are forwarded to default. Also prevents tuples from being serialized as arrays.

False
datetime bool

If True, datetime objects with tzinfo are packed as the msgpack Timestamp ext type. The timezone offset is stripped; use timestamp=3 in unpacker options to recover UTC datetime on the other end.

False
unicode_errors str | None

Error handler for encoding unicode strings (e.g. "strict", "replace", "ignore"). Avoid unless you have a specific reason to handle bad unicode.

None
raw bool

If True, unpack msgpack raw bytes to Python bytes instead of decoding to str. Useful when round-tripping data packed with use_bin_type=False.

False
timestamp int

Controls how msgpack Timestamp ext type is deserialized:

  • 0 — return a msgpack.Timestamp object (default).
  • 1 — return a float (seconds since epoch).
  • 2 — return an int (nanoseconds since epoch).
  • 3 — return a UTC-aware datetime.datetime.
0
strict_map_key bool

If True (default), only str or bytes are accepted as map keys, preventing hash-DoS attacks from untrusted input.

True
use_list bool

If True (default), unpack msgpack arrays to Python list. If False, unpack to tuple.

True
object_hook Callable[[dict[Any, Any]], Any] | None

Callable invoked with each deserialized dict. Its return value replaces the dict in output.

None
object_pairs_hook Callable[[list[tuple[Any, Any]]], Any] | None

Callable invoked with a list of (key, value) pairs for each deserialized map. Mutually exclusive with object_hook.

None
list_hook Callable[[list[Any]], Any] | None

Callable invoked with each deserialized list. Its return value replaces the list in output.

None
ext_hook Callable[[int, bytes], Any] | None

Callable invoked for ext types with no built-in decoder. Receives (code, data) and should return a Python object. Defaults to returning msgpack.ExtType(code, data).

None
max_str_len int

Maximum allowed byte length for str values. -1 means no limit.

-1
max_bin_len int

Maximum allowed byte length for bytes values. -1 means no limit.

-1
max_array_len int

Maximum allowed number of elements in arrays. -1 means no limit.

-1
max_map_len int

Maximum allowed number of key-value pairs in maps. -1 means no limit.

-1
max_ext_len int

Maximum allowed byte length for ext type data. -1 means no limit.

-1

Raises:

Type Description
ImportError

If msgpack is not installed.

Source code in src/jobify/serializers/msgpack.py
def __init__(  # noqa: PLR0913
    self,
    *,
    # Packer options
    default: Callable[[Any], Any] | None = None,
    use_bin_type: bool = True,
    strict_types: bool = False,
    datetime: bool = False,
    unicode_errors: str | None = None,
    # Unpacker options
    raw: bool = False,
    timestamp: int = 0,
    strict_map_key: bool = True,
    use_list: bool = True,
    object_hook: Callable[[dict[Any, Any]], Any] | None = None,
    object_pairs_hook: Callable[[list[tuple[Any, Any]]], Any] | None = None,
    list_hook: Callable[[list[Any]], Any] | None = None,
    ext_hook: Callable[[int, bytes], Any] | None = None,
    max_str_len: int = -1,
    max_bin_len: int = -1,
    max_array_len: int = -1,
    max_map_len: int = -1,
    max_ext_len: int = -1,
) -> None:
    """Initialize the MessagePack serializer with packer and unpacker options.

    Args:
        default: Callable invoked for types not natively supported by
            msgpack. Should return a msgpack-serializable value or raise
            ``TypeError`` if the type cannot be handled.
        use_bin_type: Use the msgpack 2.0 ``bin`` type for ``bytes``
            objects. Also enables ``str8`` type for unicode. Should be
            ``True`` (default) for all modern use; set to ``False`` only
            for compatibility with very old msgpack consumers.
        strict_types: If ``True``, only exact types are serialized;
            subclasses are forwarded to ``default``. Also prevents tuples
            from being serialized as arrays.
        datetime: If ``True``, ``datetime`` objects with ``tzinfo`` are
            packed as the msgpack ``Timestamp`` ext type. The timezone
            offset is stripped; use ``timestamp=3`` in unpacker options
            to recover UTC ``datetime`` on the other end.
        unicode_errors: Error handler for encoding unicode strings
            (e.g. ``"strict"``, ``"replace"``, ``"ignore"``). Avoid
            unless you have a specific reason to handle bad unicode.
        raw: If ``True``, unpack msgpack ``raw`` bytes to Python ``bytes``
            instead of decoding to ``str``. Useful when round-tripping
            data packed with ``use_bin_type=False``.
        timestamp: Controls how msgpack ``Timestamp`` ext type is
            deserialized:

            - ``0`` — return a ``msgpack.Timestamp`` object (default).
            - ``1`` — return a ``float`` (seconds since epoch).
            - ``2`` — return an ``int`` (nanoseconds since epoch).
            - ``3`` — return a UTC-aware ``datetime.datetime``.

        strict_map_key: If ``True`` (default), only ``str`` or ``bytes``
            are accepted as map keys, preventing hash-DoS attacks from
            untrusted input.
        use_list: If ``True`` (default), unpack msgpack arrays to Python
            ``list``. If ``False``, unpack to ``tuple``.
        object_hook: Callable invoked with each deserialized ``dict``.
            Its return value replaces the dict in output.
        object_pairs_hook: Callable invoked with a list of ``(key, value)``
            pairs for each deserialized map. Mutually exclusive with
            ``object_hook``.
        list_hook: Callable invoked with each deserialized ``list``. Its
            return value replaces the list in output.
        ext_hook: Callable invoked for ext types with no built-in decoder.
            Receives ``(code, data)`` and should return a Python object.
            Defaults to returning ``msgpack.ExtType(code, data)``.
        max_str_len: Maximum allowed byte length for ``str`` values.
            ``-1`` means no limit.
        max_bin_len: Maximum allowed byte length for ``bytes`` values.
            ``-1`` means no limit.
        max_array_len: Maximum allowed number of elements in arrays.
            ``-1`` means no limit.
        max_map_len: Maximum allowed number of key-value pairs in maps.
            ``-1`` means no limit.
        max_ext_len: Maximum allowed byte length for ext type data.
            ``-1`` means no limit.

    Raises:
        ImportError: If msgpack is not installed.

    """
    if msgpack is UNSET:  # pragma: no cover
        msg = "msgpack is required: `uv add jobify[msgpack]`"
        raise ImportError(msg)

    self.default = default
    self.use_bin_type = use_bin_type
    self.strict_types = strict_types
    self.datetime = datetime
    self.unicode_errors = unicode_errors
    self.raw = raw
    self.timestamp = timestamp
    self.strict_map_key = strict_map_key
    self.use_list = use_list
    self.object_hook = object_hook
    self.object_pairs_hook = object_pairs_hook
    self.list_hook = list_hook
    self.ext_hook = ext_hook
    self.max_str_len = max_str_len
    self.max_bin_len = max_bin_len
    self.max_array_len = max_array_len
    self.max_map_len = max_map_len
    self.max_ext_len = max_ext_len

dumpb(data)

Serialize data to MessagePack bytes.

Parameters:

Name Type Description Default
data JSONCompat

The value to serialize.

required

Returns:

Type Description
bytes

MessagePack-encoded bytes.

Source code in src/jobify/serializers/msgpack.py
@override
def dumpb(self, data: JSONCompat) -> bytes:
    """Serialize data to MessagePack bytes.

    Args:
        data: The value to serialize.

    Returns:
        MessagePack-encoded bytes.

    """
    return cast(
        "bytes",
        msgpack.dumps(
            data,
            default=self.default,
            use_bin_type=self.use_bin_type,
            strict_types=self.strict_types,
            datetime=self.datetime,
            unicode_errors=self.unicode_errors,
        ),
    )

loadb(data)

Deserialize MessagePack bytes to a Python object.

Parameters:

Name Type Description Default
data bytes

MessagePack-encoded bytes to deserialize.

required

Returns:

Type Description
JSONCompat

The deserialized Python object.

Source code in src/jobify/serializers/msgpack.py
@override
def loadb(self, data: bytes) -> JSONCompat:
    """Deserialize MessagePack bytes to a Python object.

    Args:
        data: MessagePack-encoded bytes to deserialize.

    Returns:
        The deserialized Python object.

    """
    r: JSONCompat = msgpack.loads(
        data,
        raw=self.raw,
        timestamp=self.timestamp,
        strict_map_key=self.strict_map_key,
        use_list=self.use_list,
        object_hook=self.object_hook,
        object_pairs_hook=self.object_pairs_hook,
        list_hook=self.list_hook,
        ext_hook=self.ext_hook,
        max_str_len=self.max_str_len,
        max_bin_len=self.max_bin_len,
        max_array_len=self.max_array_len,
        max_map_len=self.max_map_len,
        max_ext_len=self.max_ext_len,
    )
    return r

OrjsonSerializer

Bases: Serializer

Serialize and deserialize data using the orjson library.

See https://github.com/ijl/orjson for more information.

Source code in src/jobify/serializers/orjson.py
class OrjsonSerializer(Serializer):
    """Serialize and deserialize data using the orjson library.

    See https://github.com/ijl/orjson for more information.
    """

    def __init__(
        self,
        *,
        default: Callable[[Any], Any] | None = None,
        option: int | None = None,
    ) -> None:
        """Initialize the orjson serializer.

        Args:
            default: Callable invoked for types not natively supported by
                orjson. Should return a JSON-serializable value or raise
                ``TypeError`` if the type cannot be handled.
            option: Bitmask of ``orjson.OPT_*`` flags controlling serialization
                behavior. Multiple flags are combined with ``|``, e.g.
                ``orjson.OPT_NAIVE_UTC | orjson.OPT_NON_STR_KEYS``.

        Raises:
            ImportError: If orjson is not installed.

        """
        if orjson is UNSET:  # pragma: no cover
            msg = "orjson is required: `uv add jobify[orjson]`"
            raise ImportError(msg)

        self.default = default
        self.option = option

    @override
    def dumpb(self, data: JSONCompat) -> bytes:
        """Serialize data to JSON bytes.

        Args:
            data: The value to serialize.

        Returns:
            JSON-encoded bytes.

        """
        return orjson.dumps(data, default=self.default, option=self.option)

    @override
    def loadb(self, data: bytes) -> JSONCompat:
        """Deserialize JSON bytes to a Python object.

        Args:
            data: JSON-encoded bytes to deserialize.

        Returns:
            The deserialized Python object.

        """
        r: JSONCompat = orjson.loads(data)
        return r

__init__(*, default=None, option=None)

Initialize the orjson serializer.

Parameters:

Name Type Description Default
default Callable[[Any], Any] | None

Callable invoked for types not natively supported by orjson. Should return a JSON-serializable value or raise TypeError if the type cannot be handled.

None
option int | None

Bitmask of orjson.OPT_* flags controlling serialization behavior. Multiple flags are combined with |, e.g. orjson.OPT_NAIVE_UTC | orjson.OPT_NON_STR_KEYS.

None

Raises:

Type Description
ImportError

If orjson is not installed.

Source code in src/jobify/serializers/orjson.py
def __init__(
    self,
    *,
    default: Callable[[Any], Any] | None = None,
    option: int | None = None,
) -> None:
    """Initialize the orjson serializer.

    Args:
        default: Callable invoked for types not natively supported by
            orjson. Should return a JSON-serializable value or raise
            ``TypeError`` if the type cannot be handled.
        option: Bitmask of ``orjson.OPT_*`` flags controlling serialization
            behavior. Multiple flags are combined with ``|``, e.g.
            ``orjson.OPT_NAIVE_UTC | orjson.OPT_NON_STR_KEYS``.

    Raises:
        ImportError: If orjson is not installed.

    """
    if orjson is UNSET:  # pragma: no cover
        msg = "orjson is required: `uv add jobify[orjson]`"
        raise ImportError(msg)

    self.default = default
    self.option = option

dumpb(data)

Serialize data to JSON bytes.

Parameters:

Name Type Description Default
data JSONCompat

The value to serialize.

required

Returns:

Type Description
bytes

JSON-encoded bytes.

Source code in src/jobify/serializers/orjson.py
@override
def dumpb(self, data: JSONCompat) -> bytes:
    """Serialize data to JSON bytes.

    Args:
        data: The value to serialize.

    Returns:
        JSON-encoded bytes.

    """
    return orjson.dumps(data, default=self.default, option=self.option)

loadb(data)

Deserialize JSON bytes to a Python object.

Parameters:

Name Type Description Default
data bytes

JSON-encoded bytes to deserialize.

required

Returns:

Type Description
JSONCompat

The deserialized Python object.

Source code in src/jobify/serializers/orjson.py
@override
def loadb(self, data: bytes) -> JSONCompat:
    """Deserialize JSON bytes to a Python object.

    Args:
        data: JSON-encoded bytes to deserialize.

    Returns:
        The deserialized Python object.

    """
    r: JSONCompat = orjson.loads(data)
    return r

JSONSerializer

Bases: Serializer

Source code in src/jobify/_internal/serializers/json.py
class JSONSerializer(Serializer):
    @override
    def dumpb(self, data: JSONCompat) -> bytes:
        return json.dumps(data).encode("utf-8")

    @override
    def loadb(self, data: bytes) -> JSONCompat:
        r: JSONCompat = json.loads(data)
        return r

ExtendedJSONSerializer

Bases: Serializer

Source code in src/jobify/_internal/serializers/json_extended.py
class ExtendedJSONSerializer(Serializer):
    def __init__(
        self,
        registry: Sequence[Callable[..., SupportedTypes]] = (),
    ) -> None:
        self.registry: TypeRegistry = {}
        self.add_system_types(registry)
        self.decoder_hook: JsonDecoderHook = JsonDecoderHook(self.registry)

    def add_system_types(self, tp: Sequence[Callable[..., SupportedTypes]], /) -> None:
        self.registry.update({t.__name__: t for t in tp})

    @override
    def dumpb(self, data: SupportedTypes) -> bytes:
        return json.dumps(json_extended_encoder(data)).encode("utf-8")

    @override
    def loadb(self, data: bytes) -> SupportedTypes:
        r: SupportedTypes = json.loads(data, object_hook=self.decoder_hook)
        return r

    def register_hints(self, types: Iterable[Any]) -> None:
        for tp in types:
            if not is_structured_type(tp):
                continue
            if getattr(tp, "__name__", None) == "JobContext":
                continue
            if args := get_args(tp):
                self.register_hints(args)
                continue
            if tp.__name__ in self.registry:
                continue
            self.registry[tp.__name__] = tp
            self.register_hints(get_type_hints(tp).values())

UnsafePickleSerializer

Bases: Serializer

Source code in src/jobify/_internal/serializers/pickle_unsafe.py
class UnsafePickleSerializer(Serializer):
    @override
    def dumpb(self, data: Any) -> bytes:
        # nosemgrep: python.lang.security.deserialization.pickle.avoid-pickle
        return pickle.dumps(data)

    @override
    def loadb(self, data: bytes) -> Any:
        # nosemgrep: python.lang.security.deserialization.pickle.avoid-pickle
        return pickle.loads(data)  # noqa: S301 # nosec B301

jobify.typeadapter

Type adaptation and serialization utilities.

This module provides base classes and interfaces for converting Python types to serializable formats and back. It serves as the foundation for type adaptation, leveraging implementations from pydantic and adaptix.

The module exports the main type adaptation interfaces: - Dumper: Base class for serializing Python objects to JSON-compatible types - Loader: Base class for deserializing JSON-compatible types to Python objects

Dumper

Bases: Protocol

Interface for dumping objects into a serializable format.

Source code in src/jobify/_internal/typeadapter/base.py
class Dumper(Protocol):
    """Interface for dumping objects into a serializable format."""

    def dump(self, data: Any, tp: Any, /) -> Any:  # noqa: ANN401
        """Dump object `data` based on type `tp`."""
        raise NotImplementedError

dump(data, tp)

Dump object data based on type tp.

Source code in src/jobify/_internal/typeadapter/base.py
def dump(self, data: Any, tp: Any, /) -> Any:  # noqa: ANN401
    """Dump object `data` based on type `tp`."""
    raise NotImplementedError

Loader

Bases: Protocol

Interface for loading data into typed objects.

Source code in src/jobify/_internal/typeadapter/base.py
class Loader(Protocol):
    """Interface for loading data into typed objects."""

    def load(self, data: Any, tp: type[T], /) -> T:  # noqa: ANN401
        """Load data into the specified type `tp`."""
        raise NotImplementedError

load(data, tp)

Load data into the specified type tp.

Source code in src/jobify/_internal/typeadapter/base.py
def load(self, data: Any, tp: type[T], /) -> T:  # noqa: ANN401
    """Load data into the specified type `tp`."""
    raise NotImplementedError

PydanticConverter

Bases: Loader, Dumper

Load and dump data using pydantic's TypeAdapter.

See https://docs.pydantic.dev/latest/api/type_adapter/ for more information.

Source code in src/jobify/typeadapter/pydantic.py
class PydanticConverter(Loader, Dumper):
    """Load and dump data using pydantic's TypeAdapter.

    See https://docs.pydantic.dev/latest/api/type_adapter/ for more information.
    """

    def __init__(  # noqa: PLR0913
        self,
        *,
        config: pydantic.ConfigDict | None = None,
        strict: bool | None = None,
        from_attributes: bool | None = None,
        by_alias: bool = False,
        exclude_none: bool = False,
        context: dict[str, Any] | None = None,
    ) -> None:
        """Initialize the pydantic converter.

        Args:
            config: Pydantic ``ConfigDict`` passed to each ``TypeAdapter``.
                Cannot be used when the target type already defines its own
                config (e.g. ``BaseModel``, ``TypedDict``, ``dataclass``).
            strict: If ``True``, disable coercions and require exact types
                during validation.
            from_attributes: If ``True``, extract data from object attributes
                as well as dict keys during validation. Useful when loading
                from ORM objects such as SQLAlchemy models.
            by_alias: If ``True``, use field serialization aliases as output
                keys instead of Python attribute names.
            exclude_none: If ``True``, omit fields whose value is ``None``
                from the serialized output.
            context: Arbitrary context object passed to validators and
                serializers that accept a ``ValidationInfo`` or
                ``SerializationInfo`` argument.

        Raises:
            ImportError: If pydantic is not installed.

        """
        if pydantic is UNSET:  # pragma: no cover
            msg = "pydantic is required: `uv add jobify[pydantic]`"
            raise ImportError(msg)

        self._config = config
        self.strict = strict
        self.from_attributes = from_attributes
        self.by_alias = by_alias
        self.exclude_none = exclude_none
        self.context = context
        self._cache_adapters: dict[Hashable, pydantic.TypeAdapter[Any]] = {}

    def _adapter(self, tp: type[T]) -> pydantic.TypeAdapter[T]:
        adapter = self._cache_adapters.get(tp)
        if adapter is None:
            adapter = pydantic.TypeAdapter(tp, config=self._config)
            self._cache_adapters[tp] = adapter
        return adapter

    @override
    def load(self, data: Any, tp: type[T], /) -> T:
        """Validate and coerce data into the specified type.

        Args:
            data: The raw input to validate.
            tp: The target type to validate against.

        Returns:
            The validated and coerced object of type ``tp``.

        """
        return self._adapter(tp).validate_python(
            data,
            strict=self.strict,
            from_attributes=self.from_attributes,
            context=self.context,
        )

    @override
    def dump(self, data: Any, tp: Any, /) -> Any:
        """Serialize an object to a JSON-compatible Python structure.

        Converts complex types such as dataclasses, Pydantic models, enums,
        and datetimes into plain ``dict``, ``list``, ``str``, ``int``,
        ``float``, ``bool``, or ``None`` values suitable for any binary
        serializer (e.g. orjson, cbor2, msgpack).

        Args:
            data: The object to serialize.
            tp: The type whose pydantic schema governs serialization.

        Returns:
            A JSON-compatible Python object.

        """
        return self._adapter(tp).dump_python(
            data,
            mode="json",
            by_alias=self.by_alias,
            exclude_none=self.exclude_none,
            context=self.context,
        )

__init__(*, config=None, strict=None, from_attributes=None, by_alias=False, exclude_none=False, context=None)

Initialize the pydantic converter.

Parameters:

Name Type Description Default
config ConfigDict | None

Pydantic ConfigDict passed to each TypeAdapter. Cannot be used when the target type already defines its own config (e.g. BaseModel, TypedDict, dataclass).

None
strict bool | None

If True, disable coercions and require exact types during validation.

None
from_attributes bool | None

If True, extract data from object attributes as well as dict keys during validation. Useful when loading from ORM objects such as SQLAlchemy models.

None
by_alias bool

If True, use field serialization aliases as output keys instead of Python attribute names.

False
exclude_none bool

If True, omit fields whose value is None from the serialized output.

False
context dict[str, Any] | None

Arbitrary context object passed to validators and serializers that accept a ValidationInfo or SerializationInfo argument.

None

Raises:

Type Description
ImportError

If pydantic is not installed.

Source code in src/jobify/typeadapter/pydantic.py
def __init__(  # noqa: PLR0913
    self,
    *,
    config: pydantic.ConfigDict | None = None,
    strict: bool | None = None,
    from_attributes: bool | None = None,
    by_alias: bool = False,
    exclude_none: bool = False,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize the pydantic converter.

    Args:
        config: Pydantic ``ConfigDict`` passed to each ``TypeAdapter``.
            Cannot be used when the target type already defines its own
            config (e.g. ``BaseModel``, ``TypedDict``, ``dataclass``).
        strict: If ``True``, disable coercions and require exact types
            during validation.
        from_attributes: If ``True``, extract data from object attributes
            as well as dict keys during validation. Useful when loading
            from ORM objects such as SQLAlchemy models.
        by_alias: If ``True``, use field serialization aliases as output
            keys instead of Python attribute names.
        exclude_none: If ``True``, omit fields whose value is ``None``
            from the serialized output.
        context: Arbitrary context object passed to validators and
            serializers that accept a ``ValidationInfo`` or
            ``SerializationInfo`` argument.

    Raises:
        ImportError: If pydantic is not installed.

    """
    if pydantic is UNSET:  # pragma: no cover
        msg = "pydantic is required: `uv add jobify[pydantic]`"
        raise ImportError(msg)

    self._config = config
    self.strict = strict
    self.from_attributes = from_attributes
    self.by_alias = by_alias
    self.exclude_none = exclude_none
    self.context = context
    self._cache_adapters: dict[Hashable, pydantic.TypeAdapter[Any]] = {}

dump(data, tp)

Serialize an object to a JSON-compatible Python structure.

Converts complex types such as dataclasses, Pydantic models, enums, and datetimes into plain dict, list, str, int, float, bool, or None values suitable for any binary serializer (e.g. orjson, cbor2, msgpack).

Parameters:

Name Type Description Default
data Any

The object to serialize.

required
tp Any

The type whose pydantic schema governs serialization.

required

Returns:

Type Description
Any

A JSON-compatible Python object.

Source code in src/jobify/typeadapter/pydantic.py
@override
def dump(self, data: Any, tp: Any, /) -> Any:
    """Serialize an object to a JSON-compatible Python structure.

    Converts complex types such as dataclasses, Pydantic models, enums,
    and datetimes into plain ``dict``, ``list``, ``str``, ``int``,
    ``float``, ``bool``, or ``None`` values suitable for any binary
    serializer (e.g. orjson, cbor2, msgpack).

    Args:
        data: The object to serialize.
        tp: The type whose pydantic schema governs serialization.

    Returns:
        A JSON-compatible Python object.

    """
    return self._adapter(tp).dump_python(
        data,
        mode="json",
        by_alias=self.by_alias,
        exclude_none=self.exclude_none,
        context=self.context,
    )

load(data, tp)

Validate and coerce data into the specified type.

Parameters:

Name Type Description Default
data Any

The raw input to validate.

required
tp type[T]

The target type to validate against.

required

Returns:

Type Description
T

The validated and coerced object of type tp.

Source code in src/jobify/typeadapter/pydantic.py
@override
def load(self, data: Any, tp: type[T], /) -> T:
    """Validate and coerce data into the specified type.

    Args:
        data: The raw input to validate.
        tp: The target type to validate against.

    Returns:
        The validated and coerced object of type ``tp``.

    """
    return self._adapter(tp).validate_python(
        data,
        strict=self.strict,
        from_attributes=self.from_attributes,
        context=self.context,
    )

jobify.router

Module with all the main stuff for organizing and managing tasks in Jobify.

Includes JobRouter for grouping tasks into logical units, and RootRoute and NodeRoute classes as basis for task execution and hierarchical routing.

Route

Bases: ABC, Generic[ParamsT, Return_co]

Source code in src/jobify/_internal/router/base.py
class Route(ABC, Generic[ParamsT, Return_co]):
    def __init__(
        self,
        name: str,
        func: Callable[ParamsT, Return_co],
        options: RouteOptions,
    ) -> None:
        self.name: str = name
        self.func: Callable[ParamsT, Return_co] = func
        self.options: RouteOptions = options

    def __call__(
        self,
        *args: ParamsT.args,
        **kwargs: ParamsT.kwargs,
    ) -> Return_co:
        return self.func(*args, **kwargs)

    @overload
    def schedule(
        self: Route[ParamsT, Coroutine[object, object, T_co]],
        *args: ParamsT.args,
        **kwargs: ParamsT.kwargs,
    ) -> ScheduleBuilder[T_co]: ...

    @overload
    def schedule(
        self: Route[ParamsT, Return_co],
        *args: ParamsT.args,
        **kwargs: ParamsT.kwargs,
    ) -> ScheduleBuilder[Return_co]: ...

    @abstractmethod
    def schedule(
        self,
        *args: ParamsT.args,
        **kwargs: ParamsT.kwargs,
    ) -> ScheduleBuilder[Any]:
        raise NotImplementedError

    @overload
    async def push(
        self: Route[ParamsT, Coroutine[object, object, T_co]],
        *args: ParamsT.args,
        **kwargs: ParamsT.kwargs,
    ) -> Job[T_co]: ...

    @overload
    async def push(
        self: Route[ParamsT, Return_co],
        *args: ParamsT.args,
        **kwargs: ParamsT.kwargs,
    ) -> Job[Return_co]: ...

    @abstractmethod
    async def push(
        self,
        *args: ParamsT.args,
        **kwargs: ParamsT.kwargs,
    ) -> Job[Any]:
        raise NotImplementedError

JobRouter

Bases: Router

Router for organizing jobs and sub-routers.

NodeRouter is a container for routes and sub-routers, providing a way to structure large applications by grouping related jobs together.

Parameters:

Name Type Description Default
state State | None

Optional initial state for the router.

None
prefix str | None

Optional prefix for route names.

None
lifespan Lifespan[NodeRouter_co] | None

Optional lifespan handler for the router.

None
middleware Sequence[BaseMiddleware] | None

Middleware to apply to jobs in this router.

None
outer_middleware Sequence[BaseOuterMiddleware] | None

Middleware to apply to scheduling process.

None
exception_handlers MappingExceptionHandlers | None

Exception handlers for jobs.

None
route_class type[NodeRoute[..., Any]]

Class to use for creating new routes.

NodeRoute
Source code in src/jobify/_internal/router/node.py
class NodeRouter(Router):
    """Router for organizing jobs and sub-routers.

    `NodeRouter` is a container for routes and sub-routers, providing
    a way to structure large applications by grouping related jobs together.

    Args:
        state: Optional initial state for the router.
        prefix: Optional prefix for route names.
        lifespan: Optional lifespan handler for the router.
        middleware: Middleware to apply to jobs in this router.
        outer_middleware: Middleware to apply to scheduling process.
        exception_handlers: Exception handlers for jobs.
        route_class: Class to use for creating new routes.

    """

    def __init__(  # noqa: PLR0913
        self,
        *,
        state: State | None = None,
        prefix: str | None = None,
        lifespan: Lifespan[NodeRouter_co] | None = None,
        middleware: Sequence[BaseMiddleware] | None = None,
        outer_middleware: Sequence[BaseOuterMiddleware] | None = None,
        exception_handlers: MappingExceptionHandlers | None = None,
        route_class: type[NodeRoute[..., Any]] = NodeRoute,
    ) -> None:
        super().__init__(prefix=prefix)
        self._registrator: NodeRegistrator = NodeRegistrator(
            state=state,
            lifespan=lifespan,
            middleware=middleware,
            outer_middleware=outer_middleware,
            exception_handlers=exception_handlers,
            route_class=route_class,
        )
        self.state: State = self._registrator.state
        self.exception_handlers: ExceptionHandlers = (
            self._registrator._exception_handlers
        )

    @property
    @override
    def task(self) -> NodeRegistrator:
        return self._registrator

    @property
    @override
    def routes(self) -> Iterator[NodeRoute[..., Any]]:
        yield from self.task._routes.values()

    @property
    @override
    def sub_routers(self) -> list[NodeRouter]:
        return cast("list[NodeRouter]", self._sub_routers)

NodeRoute

Bases: Route[ParamsT, ReturnT]

Source code in src/jobify/_internal/router/node.py
class NodeRoute(Route[ParamsT, ReturnT]):
    def __init__(
        self,
        *,
        name: str,
        func: Callable[ParamsT, ReturnT],
        options: RouteOptions,
    ) -> None:
        super().__init__(name, func, options)
        self._real_route: Route[ParamsT, ReturnT] | None = None

    @property
    def real_route(self) -> Route[ParamsT, ReturnT]:
        if self._real_route is None:
            msg = (
                f"Job {self.name!r} is not attached to any Jobify app."
                " Did you forget to call app.include_router()?"
            )
            raise RuntimeError(msg)
        return self._real_route

    def bind(self, route: Route[ParamsT, ReturnT]) -> None:
        self._real_route = route

    @override
    def schedule(
        self,
        *args: ParamsT.args,
        **kwargs: ParamsT.kwargs,
    ) -> ScheduleBuilder[ReturnT]:
        return self.real_route.schedule(*args, **kwargs)

    @override
    async def push(
        self,
        *args: ParamsT.args,
        **kwargs: ParamsT.kwargs,
    ) -> Job[ReturnT]:
        return await self.real_route.push(*args, **kwargs)

RootRoute

Bases: Route[ParamsT, ReturnT]

Source code in src/jobify/_internal/router/root.py
class RootRoute(Route[ParamsT, ReturnT]):
    def __init__(  # noqa: PLR0913
        self,
        *,
        name: str,
        func: Callable[ParamsT, ReturnT],
        func_spec: FuncSpec[ReturnT],
        state: State,
        options: RouteOptions,
        strategy: RunStrategy[ParamsT, ReturnT],
        task_tracker: TaskTracker,
        exception_handlers: ExceptionHandlers,
        jobify_config: JobifyConfiguration,
    ) -> None:
        super().__init__(name, func, options)
        self._run_strategy: RunStrategy[ParamsT, ReturnT] = strategy
        self._task_state: TaskTracker = task_tracker
        self._chain_middleware: CallNext | None = None
        self._chain_outer_middleware: CallNextOuter | None = None
        self._exception_handlers: ExceptionHandlers = exception_handlers
        self._is_persist: bool = _is_persist(jobify_config, options)
        self.state: State = state
        self.func_spec: FuncSpec[ReturnT] = func_spec
        self.jobify_config: JobifyConfiguration = jobify_config

        # --------------------------------------------------------------------
        # HACK: ProcessPoolExecutor / Multiprocessing  # noqa: ERA001, FIX004
        #
        # Problem: `ProcessPoolExecutor` (used for ExecutionMode.PROCESS)
        # serializes the function by its name. When we use `@register`
        # as a decorator, the function's name in the module (`my_func`)
        # now points to the `FuncWrapper` object, not the original function.
        # This breaks `pickle`.
        #
        # Solution: We rename the *original* function (adding a suffix)
        # and "inject" it back into its own module under this new
        # name. This way, `ProcessPoolExecutor` can find and pickle it.
        #
        # We DO NOT apply this hack in two cases (Guard Clauses):
        # 1. If `register` is used as a direct function call (`reg(my_func)`),
        #    because `my_func` in the module still points to the original.
        # 2. If the function has already been renamed (protects from re-entry).
        # --------------------------------------------------------------------

        # Guard 1: Protect against double-renaming
        if func.__name__.endswith(PATCH_FUNC_NAME):
            return

        # Guard 2: Check if `register` is used as a decorator (@)
        # or as a direct function call.
        module = sys.modules[func.__module__]
        module_attr = getattr(module, func.__name__, None)
        if module_attr is func:
            return

        # Apply the hack: rename and inject back into the module
        new_name = f"{func.__name__}{PATCH_FUNC_NAME}"
        func.__name__ = new_name
        if hasattr(func, "__qualname__"):  # pragma: no cover
            original_qualname = func.__qualname__.rsplit(".", 1)
            original_qualname[-1] = new_name
            new_qualname = ".".join(original_qualname)
            func.__qualname__ = new_qualname
        setattr(module, new_name, func)

    @override
    def schedule(
        self,
        *args: ParamsT.args,
        **kwargs: ParamsT.kwargs,
    ) -> ScheduleBuilder[ReturnT]:
        bound = self.func_spec.signature.bind(*args, **kwargs)
        return self.create_builder(bound)

    @override
    async def push(self, *args: ParamsT.args, **kwargs: ParamsT.kwargs) -> Job[ReturnT]:
        bound = self.func_spec.signature.bind(*args, **kwargs)
        return await self.create_builder(bound).push()

    def create_builder(self, bound: inspect.BoundArguments) -> ScheduleBuilder[ReturnT]:
        if (
            self.jobify_config.app_started is False
            or self._chain_middleware is None
            or self._chain_outer_middleware is None
        ):
            raise_app_not_started_error("schedule")
        return ScheduleBuilder(
            name=self.name,
            state=self.state,
            options=self.options,
            func_spec=self.func_spec,
            is_persist=self._is_persist,
            task_tracker=self._task_state,
            jobify_config=self.jobify_config,
            chain_middleware=self._chain_middleware,
            exception_handlers=self._exception_handlers,
            chain_outer_middleware=self._chain_outer_middleware,
            runnable=Runnable(
                name=self.name,
                bound=bound,
                strategy=self._run_strategy,
                func_spec=self.func_spec,
            ),
        )

jobify.middleware

Middleware system for job execution pipeline.

This module provides a basic interface for creating middleware that can intercept and process jobs before they reach their final destination. Middleware can be combined together to create a processing chain, where each piece of middleware can:

  • Execute code before the job is handled
  • Pass control to the next middleware in the pipeline
  • Execute code after the job has been handled
  • Modify the state or the result
  • Break out of the chain by skipping the call_next() method

CallNext = Callable[[JobContext], Awaitable[Any]] module-attribute

CallNextOuter = Callable[[OuterContext], Awaitable[asyncio.Handle]] module-attribute

BaseMiddleware

Bases: Protocol

Source code in src/jobify/_internal/middleware/base.py
class BaseMiddleware(Protocol):
    async def __call__(self, call_next: CallNext, context: JobContext) -> Any:  # noqa: ANN401
        pass

BaseOuterMiddleware

Bases: Protocol

Source code in src/jobify/_internal/middleware/base.py
class BaseOuterMiddleware(Protocol):
    async def __call__(self, call_next: CallNextOuter, context: OuterContext) -> Any:  # noqa: ANN401
        pass

JobifyQueue

Bases: Protocol

Source code in src/jobify/_internal/middleware/queue.py
class JobifyQueue(Protocol):
    async def get(self) -> Item: ...
    async def put(self, item: Item) -> None: ...
    def task_done(self) -> None: ...

QueueMiddleware

Bases: BaseMiddleware

Source code in src/jobify/_internal/middleware/queue.py
class QueueMiddleware(BaseMiddleware):
    def __init__(self, queue: JobifyQueue = UNSET, workers: int = 100) -> None:
        if queue is UNSET:
            queue = PriorityQueue(maxsize=1024)
        self.queue: JobifyQueue = queue
        self.workers: int = workers
        self._workers: tuple[asyncio.Task[None], ...] = UNSET

    @override
    async def __call__(self, call_next: CallNext, context: JobContext) -> Any:
        priority = context.route_options.get("metadata", {}).get("priority", 0)
        item = Item(call_next, context, future=Future(), priority=-priority)
        await self.queue.put(item)
        return await item.future

    async def startup(self, _: AppType) -> None:
        self._workers = tuple(
            asyncio.create_task(self._worker()) for _ in range(self.workers)
        )

    async def shutdown(self) -> None:
        for _ in range(self.workers):
            await self.queue.put(_STOP_ITEM)
        await asyncio.gather(*self._workers)
        self._workers = UNSET

    async def _worker(self) -> None:
        while True:
            item = await self.queue.get()
            if item is _STOP_ITEM:
                self.queue.task_done()
                break

            call_next, context, fut, _ = item
            try:
                fut.set_result(await call_next(context))
            except Exception as exc:  # noqa: BLE001
                fut.set_exception(exc)
            finally:
                self.queue.task_done()

jobify.exceptions

Custom exceptions for the jobify framework.

This module defines specific exceptions that the jobify scheduling system can raise. These exceptions provide more detailed information about errors and guidance on how to handle common scheduling scenarios.

ApplicationStateError

Bases: BaseJobifyError

Raised when app is in wrong state for the requested operation.

Source code in src/jobify/_internal/exceptions.py
class ApplicationStateError(BaseJobifyError):
    """Raised when app is in wrong state for the requested operation."""

    def __init__(
        self,
        *,
        operation: str,
        reason: str,
        solution: str,
    ) -> None:
        self.operation: str = operation
        self.reason: str = reason
        self.solution: str = solution

        msg = (
            f"Cannot perform operation '{operation}'.\n"
            f"  Reason: {reason}\n"
            f"  Resolution: {solution}"
        )
        super().__init__(msg)

BaseJobifyError

Bases: Exception

Source code in src/jobify/_internal/exceptions.py
class BaseJobifyError(Exception):
    pass

DuplicateJobError

Bases: RuntimeError

Raised when a job is scheduled with an ID that is already in use.

Source code in src/jobify/_internal/exceptions.py
class DuplicateJobError(RuntimeError):
    """Raised when a job is scheduled with an ID that is already in use."""

    def __init__(self, job_id: str) -> None:
        self.job_id: str = job_id
        msg = (
            f"Job with ID {job_id!r} is already scheduled. "
            "Use 'replace=True' if you want to update the existing job."
        )
        super().__init__(msg)

JobFailedError

Bases: BaseJobifyError

Source code in src/jobify/_internal/exceptions.py
class JobFailedError(BaseJobifyError):
    def __init__(self, job_id: str, reason: str) -> None:
        self.job_id: str = job_id
        self.reason: str = reason
        super().__init__(f"job_id: {job_id}, failed_reason: {reason}")

JobNotCompletedError

Bases: BaseJobifyError

Raised when trying to access result of incomplete job.

Source code in src/jobify/_internal/exceptions.py
class JobNotCompletedError(BaseJobifyError):
    """Raised when trying to access result of incomplete job."""

    def __init__(
        self,
        msg: str = (
            "Job result is not ready yet, "
            "please use .wait() and then you can use .result"
        ),
    ) -> None:
        super().__init__(msg)

JobTimeoutError

Bases: BaseJobifyError

Raised when job execution exceeds the configured timeout.

Source code in src/jobify/_internal/exceptions.py
class JobTimeoutError(BaseJobifyError):
    """Raised when job execution exceeds the configured timeout."""

    def __init__(self, job_id: str, timeout: float) -> None:
        self.job_id: str = job_id
        self.timeout: float = timeout

        msg = (
            f"job_id: {job_id} exceeded timeout of {timeout} seconds. "
            "Job execution was interrupted."
        )
        super().__init__(msg)

RouteAlreadyRegisteredError

Bases: BaseJobifyError

A route with this name has already been registered.

Source code in src/jobify/_internal/exceptions.py
class RouteAlreadyRegisteredError(BaseJobifyError):
    """A route with this name has already been registered."""

    def __init__(self, name: str) -> None:
        msg = f"A route with the name {name!r} has already been registered."
        super().__init__(msg)