Skip to content

Jobify Configuration

These arguments are passed to the Jobify class instance.

import asyncio
from collections.abc import AsyncIterator
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from contextlib import asynccontextmanager
from zoneinfo import ZoneInfo

from adaptix import Retort

from jobify import Jobify
from jobify.crontab import create_crontab
from jobify.router import RootRoute
from jobify.serializers import JSONSerializer
from jobify.storage import SQLiteStorage


@asynccontextmanager
async def mylifespan(_: Jobify) -> AsyncIterator[None]:
    yield None


app = Jobify(
    tz=ZoneInfo("UTC"),
    dumper=Retort(),
    loader=Retort(),
    storage=SQLiteStorage(),
    lifespan=mylifespan,
    serializer=JSONSerializer(),
    middleware=[],
    outer_middleware=[],
    cron_factory=create_crontab,
    loop_factory=asyncio.get_running_loop,
    exception_handlers={},
    threadpool_executor=ThreadPoolExecutor(max_workers=4),
    processpool_executor=ProcessPoolExecutor(max_workers=3),
    route_class=RootRoute,
)

tz

  • Type: zoneinfo.ZoneInfo | None
  • Default: zoneinfo.ZoneInfo("UTC")

Sets the default time zone for the application. Any time-related calculations (such as for cron jobs) will use this time zone unless a different time zone is specified for a specific job.

dumper and loader

  • Type: Dumper | None, Loader | None
  • Default: DummyDumper, DummyLoader

These are hooks for integrating with external type systems or advanced serialization libraries, such as adaptix and pydantic.

  • dumper: A function or object that converts complex data types into a format that can be stored in a file or database, and is not handled by the main serializer.
  • loader: A function or object that reads the data from a file or database and converts it back into Python objects.

By default, they do nothing.

storage

  • Type: Storage | Literal[False] | None
  • Default: None

Configures the persistence layer for jobs.

  • None (default): Uses SQLiteStorage, which saves jobs to a local SQLite database file (jobify.db). This is the recommended option for single-node storage.
  • False: Uses DummyStorage, which is an in-memory storage. Jobs are not saved and will be lost if the application is restarted.
  • Custom Storage: You can provide an instance of a class that implements the jobify._internal.storage.abc.Storage abstract base class to customize the persistence logic (for example, using a different database).

lifespan

  • Type: Lifespan[Jobify] | None
  • Default: None

An async context manager for executing code during application startup and shutdown, working just like FastAPI's lifespan events. This is useful for managing resources, such as database connections or external clients.

You can use yield to pass a dictionary to the state of the app. This state will be accessible through app.state, for example app.state.database or app.state.cache.

Example:

import asyncio
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any, NewType, TypedDict

from jobify import Jobify

Cache = NewType("Cache", dict[str, Any])
Database = NewType("Database", dict[str, Any])


class State(TypedDict):
    pool: Database
    cache: Cache


@asynccontextmanager
async def lifespan(app: Jobify) -> AsyncIterator[State]:
    print("Application starting up!")
    # e.g., initialize database connections
    db = Database({})
    # The yielded dictionary will be stored in app.state
    yield State(pool=db, cache=Cache({}))
    print("Application shutting down!")
    # e.g., close connections gracefully


async def main() -> None:
    async with Jobify(lifespan=lifespan) as app:
        # app.state.database is now accessible.
        print("Application running with state:", app.state)


if __name__ == "__main__":
    asyncio.run(main())

serializer

  • Type: Serializer | None
  • Default: ExtendedJSONSerializer or JSONSerializer

The primary serializer for converting job data, such as function arguments, into a storable format.

  • If dumper and loader are not specified, the default value is ExtendedJSONSerializer, which supports common types such as dataclass.
  • Otherwise, it will fall back to the simpler JSONSerializer.
  • You can provide your own custom serializer instance that implements the jobify.serializers.Serializer interface.

middleware

  • Type: Sequence[BaseMiddleware] | None
  • Default: None

A sequence of middleware that can be applied globally to all jobs. The middleware can intercept job execution and add features such as automatic retries, timeouts, or custom logging.

Example:

import asyncio
import logging
from typing import Any

from jobify import JobContext, Jobify
from jobify.middleware import BaseMiddleware, CallNext

logging.basicConfig(level=logging.INFO)


class LoggingMiddleware(BaseMiddleware):
    async def __call__(self, call_next: CallNext, context: JobContext) -> Any:
        logging.info("Job %s is starting.", context.job.id)
        try:
            return await call_next(context)
        finally:
            logging.info("Job %s has finished.", context.job.id)


app = Jobify(middleware=[LoggingMiddleware()])


@app.task
def my_task() -> None:
    print("Hello from my_task!")


async def main() -> None:
    async with app:
        job = await my_task.schedule().delay(0.1)
        await job.wait()  # wait for task to run


if __name__ == "__main__":
    asyncio.run(main())

When you run this code, you will see output similar to this:

INFO:root:Job 1922a07f509e4ae098bd8ff4ebca2830 is starting.
Hello from my_task!
INFO:root:Job 1922a07f509e4ae098bd8ff4ebca2830 has finished.

A crucial feature of middleware is its ability to control the order in which functions are executed. The call_next(context) function call allows the middleware to move on to the next step in the process, or to execute the job function directly if it is the last step.

If a middleware decides not to call call_next and returns a value instead, the execution of the job (and any subsequent processes) is gracefully skipped. This allows for powerful features, such as the ability to implement custom authorization checks that can prevent a job from running.

For example, this middleware will skip any job that has skip: True in its metadata.

class SkipMiddleware(BaseMiddleware):
    async def __call__(self, call_next: CallNext, context: JobContext) -> Any:
        if context.route_options["metadata"].get("skip") is True:
            logging.warning("Job %s was skipped by middleware.", context.job.id)
            return None  # Do not call call_next, stopping execution

        return await call_next(context)

outer_middleware

  • Type: Sequence[BaseOuterMiddleware] | None
  • Default: None

A sequence of middleware that intercepts the scheduling process itself. Unlike regular middleware, which runs when a job executes, outer middleware runs when you call .schedule().at/delay(...) or .cron(...).

This allows you to:

  • Modify job arguments before they are saved or scheduled.
  • Prevent a job from being scheduled under certain conditions.
  • Perform additional actions (like logging) when the job is scheduled.

It receives an OuterContext object that contains information about the scheduling request (trigger, arguments, etc.).

Execution Logic

By default, the outer middleware is only executed when a job is newly created or when its configuration (schedule, arguments) has changed. This prevents unnecessary side effects, such as spamming logs or metrics, when the application is restarted or when the same schedule is applied idempotently.

If you need the middleware to run every time .schedule() is called, regardless of whether the job has changed, you can pass force=True as an argument.

Example:

import asyncio
from jobify import Jobify, OuterContext
from jobify.middleware import BaseOuterMiddleware, CallNextOuter

class ScheduleLoggerMiddleware(BaseOuterMiddleware):
    async def __call__(
        self, call_next: CallNextOuter, context: OuterContext
    ) -> asyncio.Handle:
        print(
            f"Scheduling job {context.job.id} with trigger: {context.trigger}"
        )
        # You can inspect context.arguments, context.trigger, etc.
        return await call_next(context)

app = Jobify(outer_middleware=[ScheduleLoggerMiddleware()])

cron_factory

  • Type: CronFactory | None
  • Default: jobify.crontab.create_crontab

A factory function for parsing cron expression strings, which supports the standard cron syntax by default, with an optional field for seconds.

loop_factory

  • Type: LoopFactory
  • Default: asyncio.get_running_loop

A callable that returns an asyncio event loop for the application to use.

exception_handlers

  • Type: MappingExceptionHandlers | None
  • Default: None

A dictionary that maps exception types to custom error handling functions, allowing for more fine-grained and customized error handling when jobs fail.

threadpool_executor and processpool_executor

  • Type: ThreadPoolExecutor | None, ProcessPoolExecutor | None
  • Default: None

Executors for running tasks in separate threads or processes.

  • threadpool_executor: To run synchronous, I/O-bound functions without blocking the main asyncio event loop.
  • processpool_executor: This is used for running synchronous, CPU-intensive functions in a separate process in order to avoid blocking the main event loop and the Global Interpreter Lock (GIL).

If not specified, Jobify will automatically create and manage executors as needed.

route_class

  • Type: type[RootRoute] | None
  • Default: jobify.router.RootRoute

The route_class parameter allows you to specify a custom class for handling tasks. This is an advanced feature that can be used to integrate with dependency injection frameworks or customize how tasks are executed.

By default, tasks are handled by the jobify.router.RootRoute class. However, you can create a subclass of this class and override its methods to change the behavior of tasks. If a custom route_class is specified, it will be used globally for all tasks and routers in the Jobify application.