How it works¶
queuebridge has two layers:
Codec (
queuebridge.codec): encode/decode Python values to a JSON-safe wire format.Backend adapters: plug the codec into Celery, Dramatiq, or Arq.
Your app queuebridge Task queue
-------- ----------- ----------
OrderCreate model --> encode() --> JSON / msgpack on wire
__qb__ tags
Worker task <-- decode + hints <-- wire dict
receives model
The flow (Celery example)¶
Producer:
process_order.delay(OrderCreate(...))Serializer: Kombu calls
queuebridge.encode()on the message body.Broker: JSON bytes travel over Redis/RabbitMQ.
Worker: Celery
pydantic=Truevalidates kwargs dicts into models.Return: Worker returns
OrderResult; Celery dumps to dict; codec encodes for result backend.Client:
typed_result(ar, OrderResult).get()decodes to a model.
What each backend does¶
Backend |
Encode hook |
Decode hook |
|---|---|---|
Celery |
Kombu |
Worker |
Dramatiq |
|
|
Arq |
msgpack + |
|
Type hints matter¶
Decoding uses your function annotations via typing.get_type_hints and Pydantic’s TypeAdapter.
If the wire value is a plain dict and your parameter is order: OrderCreate, queuebridge calls OrderCreate.model_validate(dict).
If the wire value has a __qb__ tag, the tag’s type name is used to reconstruct the object.
See Wire format for tag details.