API reference

This section is generated from the package source. For narrative guides, see the tutorials.

Core

queuebridge: bidirectional Pydantic serialization for task queues.

Pass Pydantic models to .delay(), .send(), or enqueue_job(). Get models back from results. Supports Celery, Dramatiq, and Arq.

Example:

from queuebridge import encode, decode
from queuebridge.celery import register_queuebridge, typed_result
queuebridge.decode(value, hint=typing.Any, *, strict=False)[source]

Recursively decode a wire value to Python using an optional type hint.

Parameters:
  • value (Any) – Wire data (primitives, containers, or __qb__ envelopes).

  • hint (Any) – Type annotation used for validation (e.g. OrderCreate).

  • strict (bool) – Raise QueuebridgeDecodeError when decoding fails.

Return type:

Any

Returns:

Decoded Python object.

queuebridge.encode(value, *, tag_models=True)[source]

Recursively transform a value into a JSON-serializable structure.

Parameters:
  • value (Any) – Python object to encode (models, UUID, datetime, containers, etc.).

  • tag_models (bool) – When True, wrap BaseModel instances in __qb__ envelopes.

Return type:

Any

Returns:

JSON-compatible primitives, lists, dicts, or tagged envelopes.

Raises:

QueuebridgeEncodeError – If the type is not supported.

Codec

Core encode/decode codec and wire-format helpers for queuebridge.

queuebridge.codec.class_fqn(obj)[source]

Return the fully-qualified name of a type or instance.

Example: class_fqn(OrderCreate) -> "myapp.models.OrderCreate".

Parameters:

obj (type[Any] | object) – A class or instance.

Return type:

str

Returns:

"{module}.{qualname}" string used in __qb__ envelopes.

queuebridge.codec.import_fqn(fqn)[source]

Import and return a type from its fully-qualified name.

Used when decoding __qb__ envelopes. Respects ALLOWED_MODULE_PREFIXES when that tuple is non-empty.

Parameters:

fqn (str) – e.g. "myapp.models.OrderCreate".

Return type:

type[Any]

Returns:

The resolved type.

Raises:
queuebridge.codec.is_qb_envelope(value)[source]

Return True if value is a queuebridge __qb__ tagged envelope.

Return type:

bool

Parameters:

value (Any)

queuebridge.codec.encode(value, *, tag_models=True)[source]

Recursively transform a value into a JSON-serializable structure.

Parameters:
  • value (Any) – Python object to encode (models, UUID, datetime, containers, etc.).

  • tag_models (bool) – When True, wrap BaseModel instances in __qb__ envelopes.

Return type:

Any

Returns:

JSON-compatible primitives, lists, dicts, or tagged envelopes.

Raises:

QueuebridgeEncodeError – If the type is not supported.

queuebridge.codec.decode_wire(value)[source]

Recursively unwrap __qb__ envelopes without type hints.

Useful when you know the wire data contains tags but you do not have function annotations (e.g. Dramatiq message decode path).

Parameters:

value (Any) – Wire data (nested dicts/lists with optional envelopes).

Return type:

Any

Returns:

Python objects with all envelopes resolved.

queuebridge.codec.decode(value, hint=typing.Any, *, strict=False)[source]

Recursively decode a wire value to Python using an optional type hint.

Parameters:
  • value (Any) – Wire data (primitives, containers, or __qb__ envelopes).

  • hint (Any) – Type annotation used for validation (e.g. OrderCreate).

  • strict (bool) – Raise QueuebridgeDecodeError when decoding fails.

Return type:

Any

Returns:

Decoded Python object.

Types and exceptions

Wire format constants, envelope types, and exceptions.

queuebridge.types.QB_TAG = '__qb__'

JSON key for tagged queuebridge envelopes on the wire.

queuebridge.types.QB_VERSION = 1

Current wire format version (stored in each envelope).

queuebridge.types.ALLOWED_MODULE_PREFIXES: tuple[str, ...] = ()

Module prefixes allowed for FQN import during decode.

Empty tuple means all modules are allowed. Set to e.g. ("myapp.",) in v0.2+.

class queuebridge.types.QBEnvelope[source]

Bases: TypedDict

Shape of the inner envelope at value[QB_TAG].

t: str
v: int
d: Any
exception queuebridge.types.QueuebridgeError[source]

Bases: Exception

Base exception for queuebridge.

exception queuebridge.types.QueuebridgeEncodeError[source]

Bases: QueuebridgeError

Raised when a value cannot be encoded (unsupported type).

exception queuebridge.types.QueuebridgeDecodeError[source]

Bases: QueuebridgeError

Raised when a wire value cannot be decoded.

exception queuebridge.types.QueuebridgeSecurityError[source]

Bases: QueuebridgeDecodeError

Raised when FQN import is blocked by ALLOWED_MODULE_PREFIXES.

Hints

Type hints utilities for decoding task arguments and return values.

class queuebridge.hints.TaskSignature(params, return_type)[source]

Bases: object

Cached type hints for a task function.

Parameters:
params

Mapping of parameter name to annotation (skips self, cls, ctx).

return_type

Return annotation, or Any if missing.

params: dict[str, Any]
return_type: Any
__init__(params, return_type)
Parameters:
Return type:

None

queuebridge.hints.get_task_signature(fn)[source]

Extract and cache parameter/return type hints from a callable.

Parameters:

fn (Callable[..., Any]) – Task function (sync, async, or decorated).

Return type:

TaskSignature

Returns:

TaskSignature with params and return_type.

queuebridge.hints.decode_args(fn, args, kwargs)[source]

Decode wire args and kwargs using fn’s type hints.

Skips decoding for self, cls, and ctx parameters (Arq/Celery).

Parameters:
  • fn (Callable[..., Any]) – Task function whose annotations guide decoding.

  • args (tuple[Any, ...]) – Positional wire values.

  • kwargs (dict[str, Any]) – Keyword wire values.

Return type:

tuple[tuple[Any, ...], dict[str, Any]]

Returns:

Tuple of (decoded_args, decoded_kwargs).

queuebridge.hints.decode_return(fn, result)[source]

Decode a task return value using fn’s return type hint.

Parameters:
  • fn (Callable[..., Any]) – Task function.

  • result (Any) – Wire return value from the result backend.

Return type:

Any

Returns:

Decoded Python object.

Celery

queuebridge.celery.register_queuebridge(app, *, strict=False)[source]

Register the queuebridge-json serializer on a Celery application.

Configures task_serializer, result_serializer, and accept_content. Safe to call multiple times (idempotent).

Parameters:
  • app (Celery) – Celery application instance (worker and producer must both call this).

  • strict (bool) – Reserved for future strict decode behavior on the client.

Return type:

None

Example:

from celery import Celery
from queuebridge.celery import register_queuebridge

app = Celery("myapp", broker="redis://localhost:6379/0")
register_queuebridge(app)
queuebridge.celery.typed_result(async_result, return_type)[source]

Create a TypedAsyncResult for client-side typed get().

Parameters:
  • async_result (Any) – Celery async result.

  • return_type (type[TypeVar(T)]) – Expected return type (usually a Pydantic model).

Return type:

TypedAsyncResult[TypeVar(T)]

Returns:

Wrapper whose get() returns return_type instead of dict.

queuebridge.celery.install.register_queuebridge(app, *, strict=False)[source]

Register the queuebridge-json serializer on a Celery application.

Configures task_serializer, result_serializer, and accept_content. Safe to call multiple times (idempotent).

Parameters:
  • app (Celery) – Celery application instance (worker and producer must both call this).

  • strict (bool) – Reserved for future strict decode behavior on the client.

Return type:

None

Example:

from celery import Celery
from queuebridge.celery import register_queuebridge

app = Celery("myapp", broker="redis://localhost:6379/0")
register_queuebridge(app)
class queuebridge.celery.result.TypedAsyncResult(async_result, return_type)[source]

Bases: Generic[T]

Wraps Celery AsyncResult with typed get().

Proxies all other attributes (.id, .state, .ready(), etc.) to the underlying async result.

Parameters:
  • async_result (Any) – Celery async result from task.delay() or apply_async().

  • return_type (type[TypeVar(T)]) – Pydantic model or type hint for the task return value.

Example:

ar = process_order.delay(OrderCreate(id=1, sku="X"))
result = TypedAsyncResult(ar, OrderResult).get(timeout=10)
__init__(async_result, return_type)[source]
Parameters:
  • async_result (Any)

  • return_type (type[T])

Return type:

None

get(*args, **kwargs)[source]

Block until ready and decode the result to return_type.

Return type:

TypeVar(T)

Parameters:
queuebridge.celery.result.typed_result(async_result, return_type)[source]

Create a TypedAsyncResult for client-side typed get().

Parameters:
  • async_result (Any) – Celery async result.

  • return_type (type[TypeVar(T)]) – Expected return type (usually a Pydantic model).

Return type:

TypedAsyncResult[TypeVar(T)]

Returns:

Wrapper whose get() returns return_type instead of dict.

queuebridge.celery.serializer.dumps(obj)[source]

Kombu encoder: JSON-serialize a Celery message body.

Uses encode(..., tag_models=False) so Celery pydantic=True can validate plain dicts on the worker.

Parameters:

obj (Any) – Celery/kombu message dict (task, args, kwargs, …).

Return type:

str

Returns:

JSON string.

queuebridge.celery.serializer.loads(data)[source]

Kombu decoder: parse JSON message body.

Returns wire dicts; worker-side pydantic=True validates models.

Parameters:

data (str) – JSON string from the broker.

Return type:

Any

Returns:

Parsed message dict.

Dramatiq

class queuebridge.dramatiq.QueuebridgeEncoder[source]

Bases: JSONEncoder

Dramatiq encoder that runs queuebridge on message args/kwargs.

Metadata fields (queue_name, actor_name, message_id, etc.) are left untouched. Only args and kwargs are encoded/decoded.

encode(data)[source]

Convert message metadata into a bytestring.

Return type:

bytes

Parameters:

data (dict[str, Any])

decode(data)[source]

Convert a bytestring into message metadata.

Return type:

dict[str, Any]

Parameters:

data (bytes)

queuebridge.dramatiq.register_queuebridge(broker=None)[source]

Install QueuebridgeEncoder via dramatiq.set_encoder().

Call once at process startup, before actors send messages. Idempotent.

Parameters:

broker (Broker | None) – Optional broker to set with dramatiq.set_broker().

Return type:

None

Example:

import dramatiq
from queuebridge.dramatiq import register_queuebridge

register_queuebridge()
class queuebridge.dramatiq.encoder.QueuebridgeEncoder[source]

Bases: JSONEncoder

Dramatiq encoder that runs queuebridge on message args/kwargs.

Metadata fields (queue_name, actor_name, message_id, etc.) are left untouched. Only args and kwargs are encoded/decoded.

encode(data)[source]

Convert message metadata into a bytestring.

Return type:

bytes

Parameters:

data (dict[str, Any])

decode(data)[source]

Convert a bytestring into message metadata.

Return type:

dict[str, Any]

Parameters:

data (bytes)

queuebridge.dramatiq.encoder.register_queuebridge(broker=None)[source]

Install QueuebridgeEncoder via dramatiq.set_encoder().

Call once at process startup, before actors send messages. Idempotent.

Parameters:

broker (Broker | None) – Optional broker to set with dramatiq.set_broker().

Return type:

None

Example:

import dramatiq
from queuebridge.dramatiq import register_queuebridge

register_queuebridge()

Arq

queuebridge.arq.get_serializer_pair()[source]

Return (serialize, deserialize) for Arq WorkerSettings and create_pool.

Both the worker and the client must use the same pair.

Return type:

tuple[Callable[[dict[str, Any]], bytes], Callable[[bytes], dict[str, Any]]]

Returns:

Tuple of callables compatible with Arq’s job_serializer / job_deserializer.

Example:

serialize, deserialize = get_serializer_pair()

class WorkerSettings:
    job_serializer = serialize
    job_deserializer = deserialize
queuebridge.arq.qb_task(fn)[source]

Decode wire args/kwargs before the wrapped async Arq task runs.

Apply outside @validate_call:

@qb_task
@validate_call
async def process(ctx, order: OrderCreate): ...
Parameters:

fn (TypeVar(F, bound= Callable[..., Awaitable[Any]])) – Async task function registered with Arq.

Return type:

TypeVar(F, bound= Callable[..., Awaitable[Any]])

Returns:

Wrapped coroutine with decoded arguments.

async queuebridge.arq.typed_result(job, return_type)[source]

Await job.result() and decode into a Pydantic model.

Parameters:
  • job (Any) – Arq Job instance.

  • return_type (type[TypeVar(T)]) – Expected result type.

Return type:

TypeVar(T)

Returns:

Decoded result (e.g. OrderResult).

Example:

job = await pool.enqueue_job("process_order", order=data)
result = await typed_result(job, OrderResult)
queuebridge.arq.install.get_serializer_pair()[source]

Return (serialize, deserialize) for Arq WorkerSettings and create_pool.

Both the worker and the client must use the same pair.

Return type:

tuple[Callable[[dict[str, Any]], bytes], Callable[[bytes], dict[str, Any]]]

Returns:

Tuple of callables compatible with Arq’s job_serializer / job_deserializer.

Example:

serialize, deserialize = get_serializer_pair()

class WorkerSettings:
    job_serializer = serialize
    job_deserializer = deserialize
queuebridge.arq.install.qb_task(fn)[source]

Decode wire args/kwargs before the wrapped async Arq task runs.

Apply outside @validate_call:

@qb_task
@validate_call
async def process(ctx, order: OrderCreate): ...
Parameters:

fn (TypeVar(F, bound= Callable[..., Awaitable[Any]])) – Async task function registered with Arq.

Return type:

TypeVar(F, bound= Callable[..., Awaitable[Any]])

Returns:

Wrapped coroutine with decoded arguments.

async queuebridge.arq.install.typed_result(job, return_type)[source]

Await job.result() and decode into a Pydantic model.

Parameters:
  • job (Any) – Arq Job instance.

  • return_type (type[TypeVar(T)]) – Expected result type.

Return type:

TypeVar(T)

Returns:

Decoded result (e.g. OrderResult).

Example:

job = await pool.enqueue_job("process_order", order=data)
result = await typed_result(job, OrderResult)
queuebridge.arq.serializer.serialize(d)[source]

Arq job serializer: msgpack over queuebridge-encoded dict.

Parameters:

d (dict[str, Any]) – Arq job dict (t, f, a, k, et, …).

Return type:

bytes

Returns:

msgpack bytes safe for Redis storage.

queuebridge.arq.serializer.deserialize(b)[source]

Arq job deserializer: unpack msgpack to wire dict.

Parameters:

b (bytes) – Bytes from the Arq queue.

Return type:

dict[str, Any]

Returns:

Job dict still in wire format; use @qb_task to decode at task boundary.