Celery tutorial

This guide walks through Celery from scratch. No prior queuebridge knowledge required.

Step 1: Install

pip install "queuebridge[celery]"

You also need a broker (Redis is common):

pip install redis

Step 2: Define your models

Use standard Pydantic v2 models:

from pydantic import BaseModel, Field

class OrderCreate(BaseModel):
    id: int
    sku: str = Field(min_length=1)

class OrderResult(BaseModel):
    id: int
    status: str

Step 3: Create the Celery app and register queuebridge

Call register_queuebridge() once when your app starts (worker and producer):

from celery import Celery
from queuebridge.celery import register_queuebridge

celery_app = Celery(
    "orders",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
)
register_queuebridge(celery_app)

This registers a Kombu serializer named queuebridge-json and sets it as the default task and result serializer.

Step 4: Write your task

Use pydantic=True so Celery validates arguments on the worker:

@celery_app.task(pydantic=True)
def process_order(order: OrderCreate) -> OrderResult:
    assert isinstance(order, OrderCreate)
    return OrderResult(id=order.id, status="processed")

Step 5: Enqueue with a model

Before queuebridge you had to write:

process_order.delay(order.model_dump())  # old way

Now pass the model directly:

process_order.delay(OrderCreate(id=1, sku="ABC"))

Step 6: Get a typed result

Celery’s AsyncResult.get() still returns a dict. Use typed_result():

from queuebridge.celery import typed_result

ar = process_order.delay(OrderCreate(id=1, sku="ABC"))
result = typed_result(ar, OrderResult).get(timeout=10)

assert isinstance(result, OrderResult)
print(result.status)

typed_result wraps the async result and proxies attributes like .id, .state, and .ready().

Full example

See examples/celery_fastapi/ in the repository for a FastAPI app that enqueues orders and polls typed results.

Testing without Redis

Use eager mode in tests:

celery_app.conf.update(
    task_always_eager=True,
    task_store_eager_result=True,
)

Common mistakes

Forgot ``register_queuebridge``

delay(model) raises TypeError: Object of type X is not JSON serializable.

Forgot ``pydantic=True``

The worker receives a plain dict instead of a validated model.

Using ``ar.get()`` instead of ``typed_result``

You get a dict back, not a Pydantic model. That is expected; use typed_result on the client.

API reference