Jobify Configuration¶
These arguments are passed to the Jobify class instance.
import asyncio
from collections.abc import AsyncIterator
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from contextlib import asynccontextmanager
from zoneinfo import ZoneInfo
from adaptix import Retort
from jobify import Jobify
from jobify.crontab import create_crontab
from jobify.router import RootRoute
from jobify.serializers import JSONSerializer
from jobify.storage import SQLiteStorage
@asynccontextmanager
async def mylifespan(_: Jobify) -> AsyncIterator[None]:
yield None
app = Jobify(
tz=ZoneInfo("UTC"),
dumper=Retort(),
loader=Retort(),
storage=SQLiteStorage(),
lifespan=mylifespan,
serializer=JSONSerializer(),
middleware=[],
outer_middleware=[],
cron_factory=create_crontab,
loop_factory=asyncio.get_running_loop,
exception_handlers={},
threadpool_executor=ThreadPoolExecutor(max_workers=4),
processpool_executor=ProcessPoolExecutor(max_workers=3),
route_class=RootRoute,
)
tz¶
- Type:
zoneinfo.ZoneInfo | None - Default:
zoneinfo.ZoneInfo("UTC")
Sets the default time zone for the application. Any time-related calculations (such as for cron jobs) will use this time zone unless a different time zone is specified for a specific job.
dumper and loader¶
- Type:
Dumper | None,Loader | None - Default:
DummyDumper,DummyLoader
These are hooks for integrating with external type systems or advanced serialization libraries, such as adaptix and pydantic.
dumper: A function or object that converts complex data types into a format that can be stored in a file or database, and is not handled by the main serializer.loader: A function or object that reads the data from a file or database and converts it back into Python objects.
By default, they do nothing.
storage¶
- Type:
Storage | Literal[False] | None - Default:
None
Configures the persistence layer for jobs.
None(default): UsesSQLiteStorage, which saves jobs to a local SQLite database file (jobify.db). This is the recommended option for single-node storage.False: UsesDummyStorage, which is an in-memory storage. Jobs are not saved and will be lost if the application is restarted.- Custom Storage: You can provide an instance of a class that implements the
jobify._internal.storage.abc.Storageabstract base class to customize the persistence logic (for example, using a different database).
lifespan¶
- Type:
Lifespan[Jobify] | None - Default:
None
An async context manager for executing code during application startup and shutdown, working just like FastAPI's lifespan events. This is useful for managing resources, such as database connections or external clients.
You can use yield to pass a dictionary to the state of the app.
This state will be accessible through app.state, for example app.state.database or app.state.cache.
Example:
import asyncio
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any, NewType, TypedDict
from jobify import Jobify
Cache = NewType("Cache", dict[str, Any])
Database = NewType("Database", dict[str, Any])
class State(TypedDict):
pool: Database
cache: Cache
@asynccontextmanager
async def lifespan(app: Jobify) -> AsyncIterator[State]:
print("Application starting up!")
# e.g., initialize database connections
db = Database({})
# The yielded dictionary will be stored in app.state
yield State(pool=db, cache=Cache({}))
print("Application shutting down!")
# e.g., close connections gracefully
async def main() -> None:
async with Jobify(lifespan=lifespan) as app:
# app.state.database is now accessible.
print("Application running with state:", app.state)
if __name__ == "__main__":
asyncio.run(main())
serializer¶
- Type:
Serializer | None - Default:
ExtendedJSONSerializerorJSONSerializer
The primary serializer for converting job data, such as function arguments, into a storable format.
- If
dumperandloaderare not specified, the default value isExtendedJSONSerializer, which supports common types such asdataclass. - Otherwise, it will fall back to the simpler
JSONSerializer. - You can provide your own custom serializer instance that implements the
jobify.serializers.Serializerinterface.
middleware¶
- Type:
Sequence[BaseMiddleware] | None - Default:
None
A sequence of middleware that can be applied globally to all jobs. The middleware can intercept job execution and add features such as automatic retries, timeouts, or custom logging.
Example:
import asyncio
import logging
from typing import Any
from jobify import JobContext, Jobify
from jobify.middleware import BaseMiddleware, CallNext
logging.basicConfig(level=logging.INFO)
class LoggingMiddleware(BaseMiddleware):
async def __call__(self, call_next: CallNext, context: JobContext) -> Any:
logging.info("Job %s is starting.", context.job.id)
try:
return await call_next(context)
finally:
logging.info("Job %s has finished.", context.job.id)
app = Jobify(middleware=[LoggingMiddleware()])
@app.task
def my_task() -> None:
print("Hello from my_task!")
async def main() -> None:
async with app:
job = await my_task.schedule().delay(0.1)
await job.wait() # wait for task to run
if __name__ == "__main__":
asyncio.run(main())
When you run this code, you will see output similar to this:
INFO:root:Job 1922a07f509e4ae098bd8ff4ebca2830 is starting.
Hello from my_task!
INFO:root:Job 1922a07f509e4ae098bd8ff4ebca2830 has finished.
A crucial feature of middleware is its ability to control the order in which functions are executed.
The call_next(context) function call allows the middleware to move on to the next step in the process,
or to execute the job function directly if it is the last step.
If a middleware decides not to call call_next and returns a value instead,
the execution of the job (and any subsequent processes) is gracefully skipped.
This allows for powerful features, such as the ability to implement custom authorization checks that can prevent a job from running.
For example, this middleware will skip any job that has skip: True in its metadata.
class SkipMiddleware(BaseMiddleware):
async def __call__(self, call_next: CallNext, context: JobContext) -> Any:
if context.route_options["metadata"].get("skip") is True:
logging.warning("Job %s was skipped by middleware.", context.job.id)
return None # Do not call call_next, stopping execution
return await call_next(context)
outer_middleware¶
- Type:
Sequence[BaseOuterMiddleware] | None - Default:
None
A sequence of middleware that intercepts the scheduling process itself.
Unlike regular middleware, which runs when a job executes, outer middleware runs when you call .schedule().at/delay(...) or .cron(...).
This allows you to:
- Modify job arguments before they are saved or scheduled.
- Prevent a job from being scheduled under certain conditions.
- Perform additional actions (like logging) when the job is scheduled.
It receives an OuterContext object that contains information about the scheduling request (trigger, arguments, etc.).
Execution Logic
By default, the outer middleware is only executed when a job is newly created or when its configuration (schedule, arguments) has changed. This prevents unnecessary side effects, such as spamming logs or metrics, when the application is restarted or when the same schedule is applied idempotently.
If you need the middleware to run every time .schedule() is called, regardless of whether the job has changed, you can pass force=True as an argument.
Example:
import asyncio
from jobify import Jobify, OuterContext
from jobify.middleware import BaseOuterMiddleware, CallNextOuter
class ScheduleLoggerMiddleware(BaseOuterMiddleware):
async def __call__(
self, call_next: CallNextOuter, context: OuterContext
) -> asyncio.Handle:
print(
f"Scheduling job {context.job.id} with trigger: {context.trigger}"
)
# You can inspect context.arguments, context.trigger, etc.
return await call_next(context)
app = Jobify(outer_middleware=[ScheduleLoggerMiddleware()])
cron_factory¶
- Type:
CronFactory | None - Default:
jobify.crontab.create_crontab
A factory function for parsing cron expression strings, which supports the standard cron syntax by default, with an optional field for seconds.
loop_factory¶
- Type:
LoopFactory - Default:
asyncio.get_running_loop
A callable that returns an asyncio event loop for the application to use.
exception_handlers¶
- Type:
MappingExceptionHandlers | None - Default:
None
A dictionary that maps exception types to custom error handling functions, allowing for more fine-grained and customized error handling when jobs fail.
threadpool_executor and processpool_executor¶
- Type:
ThreadPoolExecutor | None,ProcessPoolExecutor | None - Default:
None
Executors for running tasks in separate threads or processes.
threadpool_executor: To run synchronous, I/O-bound functions without blocking the mainasyncioevent loop.processpool_executor: This is used for running synchronous, CPU-intensive functions in a separate process in order to avoid blocking the main event loop and the Global Interpreter Lock (GIL).
If not specified, Jobify will automatically create and manage executors as needed.
route_class¶
- Type:
type[RootRoute] | None - Default:
jobify.router.RootRoute
The route_class parameter allows you to specify a custom class for handling tasks.
This is an advanced feature that can be used to integrate with dependency injection frameworks or customize how tasks are executed.
By default, tasks are handled by the jobify.router.RootRoute class.
However, you can create a subclass of this class and override its methods to change the behavior of tasks.
If a custom route_class is specified, it will be used globally for all tasks and routers in the Jobify application.