Skip to content

Queue Middleware

QueueMiddleware gives execution control under load: bounded buffering, controlled concurrency, and optional priority routing.

  • Backpressure


    Bounded maxsize prevents unbounded in-memory job growth.

  • Throughput Control


    workers sets max concurrent executions for safer downstream load.

  • Priority Routing


    PriorityQueue + metadata.priority runs critical jobs first.

  • Tuning


    Dynamic parameter adjustment for workers and buffer sizes.

When to Enable

Use it when:

  • jobs come in bursts
  • DB/API concurrency is limited
  • some jobs are business-critical and must run earlier

Setup

import asyncio

from jobify import Jobify
from jobify.middleware import QueueMiddleware

app = Jobify()
app.add_middleware(
    QueueMiddleware(queue=asyncio.PriorityQueue(maxsize=1024), workers=10)
)

@app.task(metadata={"priority": 100})
async def critical_job() -> None:
    ...

@app.task(metadata={"priority": 10})
async def regular_job() -> None:
    ...

Higher metadata.priority runs earlier.

Default queue: asyncio.PriorityQueue(maxsize=1024).

import asyncio

from jobify import Jobify
from jobify.middleware import QueueMiddleware

app = Jobify()
app.add_middleware(
    QueueMiddleware(queue=asyncio.Queue(maxsize=200), workers=20)
)

This is the explicit FIFO queue mode.

import asyncio

from jobify import Jobify
from jobify.middleware import QueueMiddleware

app = Jobify()
app.add_middleware(
    QueueMiddleware(queue=asyncio.LifoQueue(maxsize=200), workers=30)
)

Last in, first out: newest jobs are executed first. Useful for bursty/non-critical work where newer events are usually more relevant than older ones.

Priority Examples

1) Critical first, regular later

import asyncio

from jobify import Jobify
from jobify.middleware import QueueMiddleware

app = Jobify()
app.add_middleware(QueueMiddleware(queue=asyncio.PriorityQueue(maxsize=256), workers=1))


@app.task(metadata={"priority": 100})
async def charge_payment(order_id: str) -> str:
    return f"charged:{order_id}"


@app.task(metadata={"priority": 10})
async def send_analytics(event_id: str) -> str:
    return f"tracked:{event_id}"


async def main() -> None:
    async with app:
        low = await send_analytics.push("evt-1")
        high = await charge_payment.push("ord-1")
        await asyncio.gather(low.wait(), high.wait())

Even if low was enqueued first, charge_payment is executed earlier because its priority is higher.

2) Multiple levels (example policy)

PRIORITY_CRITICAL = 100
PRIORITY_USER_FLOW = 50
PRIORITY_BACKGROUND = 5

@app.task(metadata={"priority": PRIORITY_CRITICAL})
async def webhook_retry() -> None: ...

@app.task(metadata={"priority": PRIORITY_USER_FLOW})
async def user_notification() -> None: ...

@app.task(metadata={"priority": PRIORITY_BACKGROUND})
async def metrics_rollup() -> None: ...

Use numeric bands to keep priority rules explicit and consistent across the codebase.

Equal priority

Jobs with the same metadata.priority are not guaranteed to execute in strict FIFO order inside a PriorityQueue. If strict order matters, use asyncio.Queue or assign distinct priority values.

Backpressure Behavior

What happens when queue is full

Producers wait on enqueue. This slows intake and keeps execution bounded instead of growing memory usage without limit.

Queue Choice

  • asyncio.Queue: FIFO, most predictable default for stable pipelines.
  • asyncio.PriorityQueue: prioritize critical work.
  • asyncio.LifoQueue: niche LIFO workloads.

Tuning Cheatsheet

Parameter What it controls Practical start
workers concurrent executions set to safe downstream concurrency
queue.maxsize buffered jobs before producer waits keep finite (for example, 100-1000)
Common mistakes
  • Unbounded queue sizes.
  • workers higher than downstream service capacity.
  • Priority values without clear business rules.