Cookbook¶
Practical recipes for common queuebridge patterns.
Nested models and collections¶
queuebridge encodes nested structures recursively:
from uuid import uuid4
from pydantic import BaseModel
class LineItem(BaseModel):
sku: str
qty: int
class Order(BaseModel):
id: int
items: list[LineItem]
tags: set[str]
from queuebridge import encode, decode
order = Order(id=1, items=[LineItem(sku="A", qty=2)], tags={"urgent"})
wire = encode(order)
restored = decode(wire, Order)
assert restored == order
Use explicit hints for collections:
batch = decode(wire_list, list[Order])
Optional and Union fields¶
For Optional[OrderCreate], pass None or a model; decoding uses the hint.
For Union[OrderA, OrderB], prefer tagged envelopes on encode (default for models).
Decoding tries Pydantic discriminated unions first, then each union arm.
Testing Celery without a broker¶
app.conf.update(
task_always_eager=True,
task_store_eager_result=True,
)
ar = my_task.delay(OrderCreate(id=1, sku="X"))
result = typed_result(ar, OrderResult).get()
Testing Dramatiq with StubBroker¶
from dramatiq import Worker
from dramatiq.brokers.stub import StubBroker
from queuebridge.dramatiq import register_queuebridge
broker = StubBroker()
register_queuebridge(broker)
broker.declare_queue("default")
worker = Worker(broker, worker_timeout=100)
worker.start()
try:
my_actor.send(OrderCreate(id=1, sku="X"))
broker.join("default", timeout=5000)
finally:
worker.stop()
Using encode/decode outside task queues¶
The codec works standalone for APIs, caches, or CLI tools:
from queuebridge import encode, decode
payload = encode({"orders": [order1, order2]})
# store payload in Redis, S3, etc.
restored = decode(payload, dict[str, list[Order]])
Manual wire inspection¶
from queuebridge.codec import is_qb_envelope, decode_wire
wire = encode(OrderCreate(id=1, sku="A"))
assert is_qb_envelope(wire)
model = decode_wire(wire) # no hint needed when tags are present
FastAPI + Celery pattern¶
See examples/celery_fastapi/app.py:
FastAPI validates HTTP body into
OrderCreateprocess_order.delay(order)enqueues the modelPoll endpoint uses
typed_result(ar, OrderResult).get()Return
result.model_dump()for JSON response
Arq client + worker checklist¶
serialize, deserialize = get_serializer_pair()Set on
WorkerSettingsandcreate_pool(..., job_serializer=..., job_deserializer=...)Decorate tasks with
@qb_taskthen@validate_callUse
await typed_result(job, ReturnModel)when fetching results
Common errors¶
Error |
Fix |
|---|---|
|
Call |
Worker gets |
Add |
|
Use |
Arq |
Both sides must use custom serializers; drain old pickle jobs |
|
Model module not importable on worker; check shared package layout |
Smoke test script¶
Run the full integration smoke test (no Redis):
pip install "queuebridge[all]"
python examples/smoke_test_complex.py