Skip to content

Jobify

Jobify is a powerful asynchronous job scheduling and management framework for Python. It allows you to define and schedule background jobs with an intuitive decorator-based API, similar to modern web frameworks like FastAPI.

Key Features

  • Precision: No polling! Uses native asyncio timers for sub-millisecond accuracy and zero idle CPU usage.
  • Scheduling: Run jobs immediately, with a delay, at a specified time, or using Cron expressions (second-level precision supported).
  • Storage: Built-in SQLite ensures scheduled jobs persist through application restarts.
  • Routing: Organize tasks with JobRouter, similar to FastAPI or Aiogram.
  • Inject Context: Inject application state or custom dependencies directly into your tasks.
  • Middlewares: Powerful interceptors for both job execution and the scheduling process.
  • Exception Handlers: Hierarchical error management at the task, router, or global level.
  • Lifespan Support: Manage startup and shutdown events, just like in FastAPI.
  • Job Control: Full control over jobs — wait for completion, cancel tasks, or check results with ease.
  • Concurrency: Supports asyncio, ThreadPoolExecutor, and ProcessPoolExecutor for efficient task handling.
  • Many different adapters to the database.
  • Distributed task queue: Soon.
  • Many different serializers: Soon.

Comparison

You might have seen other libraries like APScheduler, Celery, or Taskiq. Below is a comparison of features to help you decide if Jobify fits your needs.

Feature name Jobify Taskiq APScheduler (v3) Celery
Event-driven Scheduling ✅ (Low-level timer) ❌ (Polling/Loop) ❌ (Interval) ❌ (Polling/Loop)
Async Native (asyncio) ❌ (Sync mostly)
Context Injection
FastAPI-style Routing
Middleware Support ❌ (Events only) ❌ (Signals)
Lifespan Support
Exception Handlers ✅ (Hierarchical)
Job Cancellation
Cron Scheduling ✅ (Seconds level) ✅ (Minutes)
Misfire Policy
Run Modes (Thread/Process)
Rich Typing Support
Zero-config Persistence ✅ (SQLite default) ❌ (Needs Broker) ❌ (Needs Broker)
Broker-backend execution ❌ (soon)

Why Jobify?

Unlike many other frameworks that use a while True loop to continuously check the current time against scheduled tasks (polling), Jobify uses the low-level asyncio.loop.call_at API.

  1. Efficiency: The scheduler does not consume CPU cycles if there are no tasks to process.
  2. Precision: Tasks are triggered precisely by the internal timer of the event loop, ensuring sub-millisecond accuracy and avoiding the "jitter" that can be associated with sleep intervals.
  3. Native: It works in harmony with OS-level event notification systems (epoll/kqueue).

The Precision vs. Polling Trade-off

Jobify consciously avoids polling in order to achieve maximum efficiency and sub-millisecond precision. This architectural decision means that the scheduler is sensitive to significant changes in the operating system's clock. For more information on this trade-off and why it is important, please see System Time and Scheduling.

Quick Start

Installation

pip install jobify

Basic Usage

Here is a simple example showing how to define a task and schedule it.

import asyncio
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

from jobify import Jobify

UTC = ZoneInfo("UTC")
# 1. Initialize Jobify
app = Jobify(tz=UTC)


@app.task(cron="* * * * * * *")  # Runs every second
async def my_cron() -> None:
    print("Hello! cron running every second")


@app.task
def my_job(name: str) -> None:
    now = datetime.now(tz=UTC)
    print(f"Hello, {name}! job running at: {now!r}")


async def main() -> None:
    # 4. Run the Jobify application context
    async with app:
        # Run immediately in the background.
        job = await my_job.push("Alex")

        # Schedule a one-time job at a specific time.
        run_next_day = datetime.now(tz=UTC) + timedelta(days=1)
        job_at = await my_job.schedule("Connor").at(run_next_day)

        # Schedule a one-time job after a delay.
        job_delay = await my_job.schedule("Sara").delay(20)

        # Start a dynamic cron job.
        job_cron = await my_cron.schedule().cron(
            "* * * * *",
            job_id="dynamic_cron_id",
        )

        await job_at.wait()
        await job_delay.wait()
        await job_cron.wait()
        # You can also use the `await app.wait_all()` method to wait for
        # all currently running jobs to complete.
        # Note: If there are infinitely running cron jobs, like `my_cron`,
        # `app.wait_all()` will block indefinitely until a timeout is set.
        # await app.wait_all()


if __name__ == "__main__":
    asyncio.run(main())