Task Configuration¶
Use @app.task(...) (or @router.task(...)) to configure per-task behavior: schedule policy, retries, timeout, durability, execution mode, metadata, and handlers.
-
Scheduling
cron,durable, and tasknamefor stable schedule identity. -
Reliability
retry,timeout, and task-levelexception_handlers. -
Execution Control
run_modeandmetadatafor middleware-driven policies. -
Error Handling
Task-specific
exception_handlersfor localized recovery logic.
Example¶
from jobify import Cron, Jobify, MisfirePolicy, RunMode
app = Jobify()
@app.task(
name="reports:daily",
cron=Cron(
"0 8 * * 1-5",
max_runs=100,
max_failures=5,
misfire_policy=MisfirePolicy.SKIP,
),
retry=3,
timeout=300,
durable=True,
run_mode=RunMode.PROCESS,
metadata={"priority": 100, "team": "finance"},
exception_handlers={TimeoutError: lambda exc, ctx: "timed_out"},
)
def daily_report() -> None:
...
name¶
- Type:
str - Default: auto-generated from function (
module:qualname)
Defines the unique task/route name. Set it explicitly when you need stable naming across refactors, imports, or deployments.
cron¶
- Type:
str | Cron - Default: not set
Registers a declarative recurring schedule on app startup.
Use Cron(...) when you need advanced options (max_runs, misfire_policy, start_date, args/kwargs).
from datetime import datetime
from jobify import Cron, MisfirePolicy
@app.task(
cron=Cron(
"0 18 * * 1-5",
max_runs=100,
max_failures=5,
misfire_policy=MisfirePolicy.ALL,
start_date=datetime(2027, 1, 1),
args=("financial",),
kwargs={"recipient": "admin@example.com"},
)
)
def daily_report(report_type: str, recipient: str) -> None:
...
See The Cron Object for full details.
retry¶
- Type:
int | SmartRetry - Default: not set (no retries)
Number of retry attempts after failure, or a SmartRetry object for advanced backoff and filtering.
from jobify import SmartRetry
@app.task(retry=3) # Simple: 3 retries after first failure
def fragile_io() -> None:
...
@app.task(retry=SmartRetry(retries=5, initial_delay=2.0)) # Advanced
def custom_retry() -> None:
...
timeout¶
- Type:
float - Default: not set (no timeout)
Execution timeout in seconds. If exceeded, task is marked as timeout/failure path.
durable¶
- Type:
bool - Default:
True
Controls whether scheduled jobs are persisted and restored after restart.
High-frequency cron optimization
For very high-frequency cron jobs (for example every second), consider durable=False
to reduce storage churn and write load.
run_mode¶
- Type:
RunMode.MAIN | RunMode.THREAD | RunMode.PROCESS - Default:
- async functions:
RunMode.MAIN - sync functions:
RunMode.THREAD
- async functions:
Execution model for the task function.
RunMode.MAIN: run on event loop (async def).RunMode.THREAD: run sync function inThreadPoolExecutor.RunMode.PROCESS: run sync function inProcessPoolExecutor.
from jobify import RunMode
@app.task(run_mode=RunMode.MAIN)
async def async_job() -> None:
...
@app.task(run_mode=RunMode.THREAD)
def sync_io_job() -> None:
...
@app.task(run_mode=RunMode.PROCESS)
def cpu_heavy_job() -> None:
...
Async + THREAD/PROCESS
If function is async def, Jobify keeps execution in MAIN loop.
Setting THREAD/PROCESS for async function is ignored with runtime warning semantics.
exception_handlers¶
- Type:
MappingExceptionHandlers | None - Default: not set
Task-local exception map. Takes precedence over router-level and global handlers.
from jobify import JobContext
def on_value_error(exc: Exception, context: JobContext) -> str:
return "fallback"
@app.task(exception_handlers={ValueError: on_value_error})
def task_with_recovery() -> None:
...
metadata¶
- Type:
Mapping[str, Any] | None - Default: not set
Custom task attributes used by middleware/policies. Common patterns: priority, tenant routing, feature flags, skip/debug markers.
With QueueMiddleware, larger metadata.priority values run earlier in PriorityQueue mode.