Scheduling Jobs¶
You can schedule tasks in two main ways: using recurring cron expressions or dynamically at runtime.
Cron Expressions¶
Jobify uses the crontab library to parse and schedule jobs from cron expressions. This provides a flexible and powerful way to define recurring tasks.
Syntax¶
A cron expression is a string of fields that describe a schedule. The crontab library supports standard 5-field cron expressions, but also expressions with 6 or 7 fields for second-level precision and year specification.
The general representation is:
Expression Formats¶
The crontab library supports multiple expression formats, which are interpreted as follows:
-
5 fields:
MINUTES HOURS DAY_OF_MONTH MONTH DAY_OF_WEEK- Example:
* * * * *- runs every minute.
- Example:
-
6 fields:
MINUTES HOURS DAY_OF_MONTH MONTH DAY_OF_WEEK YEAR- Example:
0 9 * * 1 2024- runs at 9:00 AM every Monday in 2024.
- Example:
-
7 fields:
SECONDS MINUTES HOURS DAY_OF_MONTH MONTH DAY_OF_WEEK YEAR- This is the format you must use for second-level precision.
- Example:
*/15 * * * * * *- runs every 15 seconds.
Warning
To schedule a job with second-level precision (e.g., every 15 seconds), you must use a 7-field expression. A 6-field expression will be interpreted as including a YEAR field, not a SECONDS field.
Fields¶
| Field | Allowed Values | Allowed Special Characters |
|---|---|---|
seconds |
0-59 | *, /, ,, - |
minutes |
0-59 | *, /, ,, - |
hours |
0-23 | *, /, ,, - |
day_of_month |
1-31 | *, /, ,, -, ?, L |
month |
1-12 or JAN-DEC | *, /, ,, - |
day_of_week |
0-6 or SUN-SAT | *, /, ,, -, ?, L |
year |
1970-2099 | *, /, ,, - |
Special Characters¶
*(Asterisk): Selects all possible values for a field. For example,*in theminutesfield means "every minute".,(Comma): Used to specify a list of values. For example, "1, 5, 10" in the "minutes" field means "at minutes 1, 5 and 10".-(Dash): Specifies a range of values. For example,1-5in theday_of_weekfield means "Monday to Friday."/(Slash): Specifies a step value. For example,/15in thesecondsfield means "every 15 seconds."?(Question Mark): Used inday_of_monthorday_of_weekto indicate "no specific value", which is useful when you want to specify one of these fields but not the other.L(Last): has different meanings depending on the context. Inday_of_monthit refers to the last day of the month, while inday_of_weekit means the last day of the week, which is Saturday.
Predefined Expressions (Aliases)¶
You can also use one of the following predefined cron expressions:
| Expression | Equivalent | Description |
|---|---|---|
@yearly |
0 0 1 1 * |
Run once a year, at midnight on Jan 1st. |
@monthly |
0 0 1 * * |
Run once a month, at midnight on the 1st. |
@weekly |
0 0 * * 0 |
Run once a week, at midnight on Sunday. |
@daily |
0 0 * * * |
Run once a day, at midnight. |
@hourly |
0 * * * * |
Run once an hour, at the beginning. |
For more detailed information, please refer to the official crontab library documentation.
The Cron Object¶
You can use the Cron class to have more control over the scheduling of the job (e.g., limiting the number of runs or handling misfires).
from jobify import Cron, MisfirePolicy
from datetime import datetime
Cron(
"0 18 * * 1-5",
max_runs=100,
max_failures=5,
misfire_policy=MisfirePolicy.ALL,
start_date=datetime(2027, 1, 1), # The task will start on January 1, 2027.
)
The Cron class has the following properties:
- expression (
str): The cron expression as a string. - max_runs (
int, default:INFINITY (-1)): The maximum number of times a job can run. The run count is persisted in the database, so it survives application restarts. When the limit is reached, the job is automatically removed from the schedule. - max_failures (
int, default:10): The maximum number of consecutive failed attempts allowed before a job is permanently stopped and no longer scheduled. This value must be greater than or equal to 1. - misfire_policy (
MisfirePolicy | GracePolicy, default:MisfirePolicy.ONCE): Determines how to deal with missed schedules (for example, if the application is unavailable).MisfirePolicy.ALL: Run all missed executions immediately.MisfirePolicy.SKIP: If there were missed executions, then skip them.MisfirePolicy.ONCE: If there were missed executions, run only once.MisfirePolicy.GRACE(timedelta): If the missed schedule is within the specified grace period, please start it immediately.
- start_date (
datetime | None, default:None): The scheduled start time is used to anchor the first execution of the job to a specific datetime.
Dynamic Scheduling¶
In addition to cron jobs that are defined at the application level, users can also schedule tasks to run at a specific time or after a delay. They can even dynamically create new cron schedules. This is useful for one-time tasks or tasks that are triggered by the application's logic.
To dynamically schedule a task, you need to create a ScheduleBuilder. You can do this by calling the .schedule() method on the task function. The arguments that you pass to .schedule() will be used when the task runs.
push (Immediate Execution)¶
The .push() method is the fastest way to offload a task to the background. It schedules the task to run as soon as possible, similar to the .delay(0), but more concise.
Unlike the .schedule() method, which requires you to create the schedule first, the .push() method accepts your function arguments directly.
Persistence
Just like other scheduling methods, tasks created using .push() are automatically saved to storage (unless the durable=False parameter is set on the task). This ensures that even if the application crashes immediately after the task is pushed, it will be picked up and processed when the application restarts.
example:
import asyncio
from typing import Any
from jobify import Jobify
app = Jobify()
@app.task(durable=False, retry=3)
def process_data(data: dict[str, Any]) -> None:
print(f"Processing: {data}")
async def main() -> None:
async with app:
# Offload execution immediately
job = await process_data.push({"id": 1, "value": "test"})
# You can still wait the result for it if needed
await job.wait()
asyncio.run(main())
cron¶
To dynamically schedule a recurring task using a cron expression, use the .cron() method.
# Schedules a recurring task.
schedule(*args, **kwargs).cron(
cron: str | Cron, # The cron expression or `Cron` object.
*,
job_id: str, # Required unique identifier for the job.
replace: bool = False, # If True, updates the existing job.
force: bool = False, # Force scheduling even if the job already exists with the same parameters.
)
Outer Middleware Execution
By default, the Outer Middleware is only executed when a new job is created or when an existing job's parameters are updated, such as a new schedule or different arguments. If you try to schedule a job with identical configuration to an existing job, the outer middleware will not be executed to prevent unnecessary side effects like logging or metric spam.
To ensure that the outer middleware always runs, set force=True.
Idempotency during Replacement
- When using replace=True for cron jobs, the scheduler will preserve the current execution progress if the start_date has not changed in your code.
- This prevents "double-firing" or schedule resets during app redeploys.
- If the start_date is modified, however, the schedule will be "hard-reset" to the new date.
example:
import asyncio
from jobify import Jobify
app = Jobify()
@app.task
def cleanup_logs() -> None:
print("Cleaning up logs...")
@app.task(cron="@daily")
def daily_report() -> None:
print("Generating daily report...")
async def main() -> None:
async with app:
# Schedule cleanup every 5 minutes
job = await cleanup_logs.schedule().cron(
cron="*/5 * * * *",
job_id="cleanup_task_dynamic",
)
# Keep the app running...
await app.wait_all()
asyncio.run(main())
delay¶
To run a task after a specified delay, use the delay() method of the builder.
# Schedules the task to run after a specified number of seconds.
schedule(*args, **kwargs).delay(
seconds: float, # The delay in seconds.
*,
job_id: str | None = None, # Optional unique identifier for the job.
now: datetime | None = None, # Optional reference datetime.
replace: bool = False, # If True, updates the existing job.
force: bool = False, # Force scheduling even if the job already exists with the same parameters.
)
import asyncio
from jobify import Jobify
app = Jobify()
@app.task
def send_email(to: str, subject: str) -> None:
print(f"Sending email to {to} about {subject}")
async def main() -> None:
async with app:
# Schedule the email to be sent in 60 seconds
job = await send_email.schedule(
to="user@example.com",
subject="Hello",
).delay(seconds=60)
await job.wait()
asyncio.run(main())
at¶
To run a task at a specific datetime, use the .at() method.
# Schedules the task to run at the specified `datetime`.
schedule(*args, **kwargs).at(
at: datetime, # The execution time.
*,
job_id: str | None = None, # Optional unique identifier for the job.
replace: bool = False, # If True, updates the existing job.
force: bool = False, # Force scheduling even if the job already exists with the same parameters.
)
example:
import asyncio
from datetime import datetime, timedelta
from jobify import Jobify
app = Jobify()
@app.task
def generate_report(report_id: int) -> None:
print(f"Generating report {report_id}")
async def main() -> None:
async with app:
# Schedule the report to be generated 10 minutes from now
run_time = datetime.now(app.configs.tz) + timedelta(minutes=10)
job = await generate_report.schedule(report_id=123).at(at=run_time)
# Keep the app running to allow the job to execute
await job.wait()
asyncio.run(main())
Replacing existing jobs
By default, if you try to schedule a job with a job ID that is already in use, a DuplicateJobError will be raised.
To avoid this, you can set replace=True when scheduling a job using any of the following methods: cron, at, or delay. This will automatically cancel the existing job and schedule a new one in its place.
For cron jobs, setting replace=True will also preserve the scheduling offset, ensuring consistency in execution.