Skip to content

Exception Handlers

Exception handlers provide custom logic for dealing with errors during task execution. They are essential for logging, monitoring, and implementing custom recovery strategies.

  • Hierarchical Priority


    Handlers follow a "most-specific-first" rule: Task > Router > Global.

  • Retry Control


    Reraise to trigger retries, or return a value to recover and mark the job as SUCCESS.

  • Fatal Failures


    Use SmartRetry.exclude_exceptions to abort all future retries and fail the job immediately.

  • Scoped Logic


    Define handlers at Task, Router, or Global levels for fine-grained control.

How It Works

Jobify looks for the most specific exception handler based on the exception type (or its parent class). If a handler is found at the task level, it executes, and parent levels are ignored.

Handler Signature

A handler is a sync or async callable taking exactly two arguments:

  • exc: The exception instance raised by the task.
  • context: The JobContext of the current execution.
async def my_handler(exc: Exception, context: JobContext) -> None:
    print(f"Job {context.job.id} failed: {exc}")

Configuration Levels

Applied to every task in the Jobify application.

app = Jobify(
    exception_handlers={
        TypeError: global_type_error_handler
    }
)

Applied to all tasks within a specific JobRouter.

router = JobRouter(
    exception_handlers={
        ValueError: router_value_error_handler
    }
)

Specific to a single @app.task. Overrides both Router and Global levels.

@app.task(
    exception_handlers={
        TimeoutError: task_timeout_handler
    }
)
async def my_task(): ...

Execution Behavior

How your handler exits determines the final state of the job and whether retries occur.

To allow RetryMiddleware to catch the error and retry the task, you must re-raise the exception.

async def my_handler(exc: Exception, context: JobContext):
    log.error("Retrying...")
    raise exc  # Re-raise to trigger retry logic

If you return a value (or simply exit), the job is marked as SUCCESS. The returned value becomes the job.result().

async def recovery_handler(exc, ctx) -> str:
    return "fallback_value"  # Job status: SUCCESS

To stop all retries and fail immediately, raise an exception that is included in exclude_exceptions of your SmartRetry configuration.

from jobify import SmartRetry

# Define a custom fatal exception
class FatalError(Exception): ...

@app.task(
    retry=SmartRetry(retries=3, exclude_exceptions=(FatalError,))
)
async def my_task():
    raise FatalError("Unrecoverable") # Fails immediately, no retries

Hierarchical Example

Evaluation Order

When TypeError is raised in process_report: 1. Check process_report task handlers (Found! -> Run it). 2. Ignore Router and Global levels.

# Global
app = Jobify(exception_handlers={TypeError: handle_global})

# Router
router = JobRouter(prefix="reports", exception_handlers={TypeError: handle_router})

# Task
@router.task(exception_handlers={TypeError: handle_task})
async def process_report():
    raise TypeError("Specific error")