API reference¶
This section is generated from the package source. For narrative guides, see the tutorials.
Core¶
queuebridge: bidirectional Pydantic serialization for task queues.
Pass Pydantic models to .delay(), .send(), or enqueue_job().
Get models back from results. Supports Celery, Dramatiq, and Arq.
Example:
from queuebridge import encode, decode
from queuebridge.celery import register_queuebridge, typed_result
- queuebridge.decode(value, hint=typing.Any, *, strict=False)[source]¶
Recursively decode a wire value to Python using an optional type hint.
- queuebridge.encode(value, *, tag_models=True)[source]¶
Recursively transform a value into a JSON-serializable structure.
- Parameters:
- Return type:
- Returns:
JSON-compatible primitives, lists, dicts, or tagged envelopes.
- Raises:
QueuebridgeEncodeError – If the type is not supported.
Codec¶
Core encode/decode codec and wire-format helpers for queuebridge.
- queuebridge.codec.class_fqn(obj)[source]¶
Return the fully-qualified name of a type or instance.
Example:
class_fqn(OrderCreate)->"myapp.models.OrderCreate".
- queuebridge.codec.import_fqn(fqn)[source]¶
Import and return a type from its fully-qualified name.
Used when decoding
__qb__envelopes. RespectsALLOWED_MODULE_PREFIXESwhen that tuple is non-empty.- Parameters:
fqn (
str) – e.g."myapp.models.OrderCreate".- Return type:
- Returns:
The resolved type.
- Raises:
QueuebridgeSecurityError – If the module prefix is blocked.
QueuebridgeDecodeError – If the FQN is invalid or not a type.
- queuebridge.codec.is_qb_envelope(value)[source]¶
Return True if
valueis a queuebridge__qb__tagged envelope.
- queuebridge.codec.encode(value, *, tag_models=True)[source]¶
Recursively transform a value into a JSON-serializable structure.
- Parameters:
- Return type:
- Returns:
JSON-compatible primitives, lists, dicts, or tagged envelopes.
- Raises:
QueuebridgeEncodeError – If the type is not supported.
- queuebridge.codec.decode_wire(value)[source]¶
Recursively unwrap
__qb__envelopes without type hints.Useful when you know the wire data contains tags but you do not have function annotations (e.g. Dramatiq message decode path).
Types and exceptions¶
Wire format constants, envelope types, and exceptions.
- queuebridge.types.QB_TAG = '__qb__'¶
JSON key for tagged queuebridge envelopes on the wire.
- queuebridge.types.QB_VERSION = 1¶
Current wire format version (stored in each envelope).
- queuebridge.types.ALLOWED_MODULE_PREFIXES: tuple[str, ...] = ()¶
Module prefixes allowed for FQN import during decode.
Empty tuple means all modules are allowed. Set to e.g.
("myapp.",)in v0.2+.
- class queuebridge.types.QBEnvelope[source]¶
Bases:
TypedDictShape of the inner envelope at
value[QB_TAG].
- exception queuebridge.types.QueuebridgeError[source]¶
Bases:
ExceptionBase exception for queuebridge.
- exception queuebridge.types.QueuebridgeEncodeError[source]¶
Bases:
QueuebridgeErrorRaised when a value cannot be encoded (unsupported type).
- exception queuebridge.types.QueuebridgeDecodeError[source]¶
Bases:
QueuebridgeErrorRaised when a wire value cannot be decoded.
- exception queuebridge.types.QueuebridgeSecurityError[source]¶
Bases:
QueuebridgeDecodeErrorRaised when FQN import is blocked by
ALLOWED_MODULE_PREFIXES.
Hints¶
Type hints utilities for decoding task arguments and return values.
- class queuebridge.hints.TaskSignature(params, return_type)[source]¶
Bases:
objectCached type hints for a task function.
- params¶
Mapping of parameter name to annotation (skips
self,cls,ctx).
- return_type¶
Return annotation, or
Anyif missing.
- queuebridge.hints.get_task_signature(fn)[source]¶
Extract and cache parameter/return type hints from a callable.
- Parameters:
fn (
Callable[...,Any]) – Task function (sync, async, or decorated).- Return type:
- Returns:
TaskSignaturewithparamsandreturn_type.
Celery¶
- queuebridge.celery.register_queuebridge(app, *, strict=False)[source]¶
Register the
queuebridge-jsonserializer on a Celery application.Configures
task_serializer,result_serializer, andaccept_content. Safe to call multiple times (idempotent).- Parameters:
app (
Celery) – Celery application instance (worker and producer must both call this).strict (
bool) – Reserved for future strict decode behavior on the client.
- Return type:
Example:
from celery import Celery from queuebridge.celery import register_queuebridge app = Celery("myapp", broker="redis://localhost:6379/0") register_queuebridge(app)
- queuebridge.celery.typed_result(async_result, return_type)[source]¶
Create a
TypedAsyncResultfor client-side typedget().- Parameters:
- Return type:
- Returns:
Wrapper whose
get()returnsreturn_typeinstead ofdict.
- queuebridge.celery.install.register_queuebridge(app, *, strict=False)[source]¶
Register the
queuebridge-jsonserializer on a Celery application.Configures
task_serializer,result_serializer, andaccept_content. Safe to call multiple times (idempotent).- Parameters:
app (
Celery) – Celery application instance (worker and producer must both call this).strict (
bool) – Reserved for future strict decode behavior on the client.
- Return type:
Example:
from celery import Celery from queuebridge.celery import register_queuebridge app = Celery("myapp", broker="redis://localhost:6379/0") register_queuebridge(app)
- class queuebridge.celery.result.TypedAsyncResult(async_result, return_type)[source]¶
Bases:
Generic[T]Wraps Celery
AsyncResultwith typedget().Proxies all other attributes (
.id,.state,.ready(), etc.) to the underlying async result.- Parameters:
Example:
ar = process_order.delay(OrderCreate(id=1, sku="X")) result = TypedAsyncResult(ar, OrderResult).get(timeout=10)
- queuebridge.celery.result.typed_result(async_result, return_type)[source]¶
Create a
TypedAsyncResultfor client-side typedget().- Parameters:
- Return type:
- Returns:
Wrapper whose
get()returnsreturn_typeinstead ofdict.
Dramatiq¶
- class queuebridge.dramatiq.QueuebridgeEncoder[source]¶
Bases:
JSONEncoderDramatiq encoder that runs queuebridge on message args/kwargs.
Metadata fields (
queue_name,actor_name,message_id, etc.) are left untouched. Onlyargsandkwargsare encoded/decoded.
- queuebridge.dramatiq.register_queuebridge(broker=None)[source]¶
Install
QueuebridgeEncoderviadramatiq.set_encoder().Call once at process startup, before actors send messages. Idempotent.
- Parameters:
broker (
Broker|None) – Optional broker to set withdramatiq.set_broker().- Return type:
Example:
import dramatiq from queuebridge.dramatiq import register_queuebridge register_queuebridge()
- class queuebridge.dramatiq.encoder.QueuebridgeEncoder[source]¶
Bases:
JSONEncoderDramatiq encoder that runs queuebridge on message args/kwargs.
Metadata fields (
queue_name,actor_name,message_id, etc.) are left untouched. Onlyargsandkwargsare encoded/decoded.
- queuebridge.dramatiq.encoder.register_queuebridge(broker=None)[source]¶
Install
QueuebridgeEncoderviadramatiq.set_encoder().Call once at process startup, before actors send messages. Idempotent.
- Parameters:
broker (
Broker|None) – Optional broker to set withdramatiq.set_broker().- Return type:
Example:
import dramatiq from queuebridge.dramatiq import register_queuebridge register_queuebridge()
Arq¶
- queuebridge.arq.get_serializer_pair()[source]¶
Return
(serialize, deserialize)for ArqWorkerSettingsandcreate_pool.Both the worker and the client must use the same pair.
- Return type:
tuple[Callable[[dict[str,Any]],bytes],Callable[[bytes],dict[str,Any]]]- Returns:
Tuple of callables compatible with Arq’s
job_serializer/job_deserializer.
Example:
serialize, deserialize = get_serializer_pair() class WorkerSettings: job_serializer = serialize job_deserializer = deserialize
- queuebridge.arq.qb_task(fn)[source]¶
Decode wire args/kwargs before the wrapped async Arq task runs.
Apply outside
@validate_call:@qb_task @validate_call async def process(ctx, order: OrderCreate): ...
- async queuebridge.arq.typed_result(job, return_type)[source]¶
Await
job.result()and decode into a Pydantic model.- Parameters:
- Return type:
TypeVar(T)- Returns:
Decoded result (e.g.
OrderResult).
Example:
job = await pool.enqueue_job("process_order", order=data) result = await typed_result(job, OrderResult)
- queuebridge.arq.install.get_serializer_pair()[source]¶
Return
(serialize, deserialize)for ArqWorkerSettingsandcreate_pool.Both the worker and the client must use the same pair.
- Return type:
tuple[Callable[[dict[str,Any]],bytes],Callable[[bytes],dict[str,Any]]]- Returns:
Tuple of callables compatible with Arq’s
job_serializer/job_deserializer.
Example:
serialize, deserialize = get_serializer_pair() class WorkerSettings: job_serializer = serialize job_deserializer = deserialize
- queuebridge.arq.install.qb_task(fn)[source]¶
Decode wire args/kwargs before the wrapped async Arq task runs.
Apply outside
@validate_call:@qb_task @validate_call async def process(ctx, order: OrderCreate): ...
- async queuebridge.arq.install.typed_result(job, return_type)[source]¶
Await
job.result()and decode into a Pydantic model.- Parameters:
- Return type:
TypeVar(T)- Returns:
Decoded result (e.g.
OrderResult).
Example:
job = await pool.enqueue_job("process_order", order=data) result = await typed_result(job, OrderResult)