Arq tutorial

Arq defaults to pickle, which is fast but opaque and unsafe with untrusted data. queuebridge provides msgpack serializers that understand Pydantic models.

Step 1: Install

pip install "queuebridge[arq]"

Step 2: Get the serializer pair

from queuebridge.arq import get_serializer_pair

serialize, deserialize = get_serializer_pair()

Step 3: Configure worker and client

Both the Arq worker and create_pool() must use the same serializers:

from arq.connections import RedisSettings

class WorkerSettings:
    functions = [process_order]
    redis_settings = RedisSettings()
    job_serializer = serialize
    job_deserializer = deserialize

When enqueueing from your app:

pool = await create_pool(
    RedisSettings(),
    job_serializer=serialize,
    job_deserializer=deserialize,
)

Step 4: Decorate your task with qb_task

Arq passes wire dicts into your function. @qb_task decodes them using your type hints:

from pydantic import BaseModel, validate_call
from queuebridge.arq import qb_task

class OrderCreate(BaseModel):
    id: int
    sku: str

@qb_task
@validate_call
async def process_order(ctx, order: OrderCreate) -> OrderResult:
    return OrderResult(id=order.id, status="ok")

Apply @qb_task outside @validate_call.

Step 5: Typed job results

from queuebridge.arq import typed_result

job = await pool.enqueue_job("process_order", order=OrderCreate(id=1, sku="X"))
result = await typed_result(job, OrderResult)

Full example

See examples/arq_example/worker.py.

API reference