Source code for queuebridge.codec

"""Core encode/decode codec and wire-format helpers for queuebridge."""

from __future__ import annotations

import importlib
from datetime import date, datetime, time
from decimal import Decimal
from enum import Enum
from typing import Any, Union, get_args, get_origin
from uuid import UUID

from pydantic import BaseModel, RootModel, TypeAdapter
from typing_extensions import get_origin as te_get_origin

from queuebridge.types import (
    ALLOWED_MODULE_PREFIXES,
    QB_TAG,
    QB_VERSION,
    QueuebridgeDecodeError,
    QueuebridgeEncodeError,
    QueuebridgeSecurityError,
)

_BUILTIN_DECODE: dict[str, type[Any]] = {
    "uuid.UUID": UUID,
    "datetime.datetime": datetime,
    "datetime.date": date,
    "datetime.time": time,
    "decimal.Decimal": Decimal,
}


[docs] def class_fqn(obj: type[Any] | object) -> str: """Return the fully-qualified name of a type or instance. Example: ``class_fqn(OrderCreate)`` -> ``"myapp.models.OrderCreate"``. Args: obj: A class or instance. Returns: ``"{module}.{qualname}"`` string used in ``__qb__`` envelopes. """ cls = obj if isinstance(obj, type) else type(obj) return f"{cls.__module__}.{cls.__qualname__}"
[docs] def import_fqn(fqn: str) -> type[Any]: """Import and return a type from its fully-qualified name. Used when decoding ``__qb__`` envelopes. Respects ``ALLOWED_MODULE_PREFIXES`` when that tuple is non-empty. Args: fqn: e.g. ``"myapp.models.OrderCreate"``. Returns: The resolved type. Raises: QueuebridgeSecurityError: If the module prefix is blocked. QueuebridgeDecodeError: If the FQN is invalid or not a type. """ module_name, _, qualname = fqn.rpartition(".") if not module_name: raise QueuebridgeDecodeError(f"invalid FQN: {fqn}") if ALLOWED_MODULE_PREFIXES and not module_name.startswith(ALLOWED_MODULE_PREFIXES): raise QueuebridgeSecurityError(f"import blocked: {fqn}") module = importlib.import_module(module_name) obj: Any = module for part in qualname.split("."): obj = getattr(obj, part) if not isinstance(obj, type): raise QueuebridgeDecodeError(f"FQN does not resolve to a type: {fqn}") return obj
def _make_envelope(type_name: str, payload: Any) -> dict[str, Any]: return {QB_TAG: {"t": type_name, "v": QB_VERSION, "d": payload}}
[docs] def is_qb_envelope(value: Any) -> bool: """Return True if ``value`` is a queuebridge ``__qb__`` tagged envelope.""" if not isinstance(value, dict) or QB_TAG not in value: return False inner = value[QB_TAG] return ( isinstance(inner, dict) and "t" in inner and "v" in inner and "d" in inner )
def _origin(hint: Any) -> Any: origin = get_origin(hint) if origin is None: origin = te_get_origin(hint) return origin def _is_base_model_type(tp: Any) -> bool: return isinstance(tp, type) and issubclass(tp, BaseModel) def _decode_envelope(value: dict[str, Any]) -> Any: inner = value[QB_TAG] type_name: str = inner["t"] payload: Any = inner["d"] if type_name in _BUILTIN_DECODE: builtin = _BUILTIN_DECODE[type_name] if builtin is UUID: return UUID(payload) if builtin is datetime: return datetime.fromisoformat(payload) if builtin is date: return date.fromisoformat(payload) if builtin is time: return time.fromisoformat(payload) if builtin is Decimal: return Decimal(payload) tag_type = import_fqn(type_name) if _is_base_model_type(tag_type): return tag_type.model_validate(payload) if isinstance(tag_type, type) and issubclass(tag_type, Enum): return tag_type(payload) return TypeAdapter(tag_type).validate_python(payload)
[docs] def encode(value: Any, *, tag_models: bool = True) -> Any: """Recursively transform a value into a JSON-serializable structure. Args: value: Python object to encode (models, UUID, datetime, containers, etc.). tag_models: When True, wrap ``BaseModel`` instances in ``__qb__`` envelopes. Returns: JSON-compatible primitives, lists, dicts, or tagged envelopes. Raises: QueuebridgeEncodeError: If the type is not supported. """ if value is None or isinstance(value, (bool, int, float, str)): return value if is_qb_envelope(value): return value if isinstance(value, BaseModel): dumped = value.model_dump(mode="json") if not tag_models: return dumped return _make_envelope(class_fqn(value), dumped) if isinstance(value, RootModel): inner = value.root if isinstance(inner, BaseModel): dumped = inner.model_dump(mode="json") if not tag_models: return dumped return _make_envelope(class_fqn(inner), dumped) return encode(inner, tag_models=tag_models) if isinstance(value, UUID): return _make_envelope("uuid.UUID", str(value)) if isinstance(value, datetime): return _make_envelope("datetime.datetime", value.isoformat()) if isinstance(value, date): return _make_envelope("datetime.date", value.isoformat()) if isinstance(value, time): return _make_envelope("datetime.time", value.isoformat()) if isinstance(value, Decimal): return _make_envelope("decimal.Decimal", str(value)) if isinstance(value, Enum): return _make_envelope(class_fqn(value.__class__), value.value) if isinstance(value, dict): return {str(encode(k, tag_models=tag_models)): encode(v, tag_models=tag_models) for k, v in value.items()} if isinstance(value, (list, tuple, set)): return [encode(item, tag_models=tag_models) for item in value] raise QueuebridgeEncodeError(f"unsupported type: {type(value)}")
def _decode_union(value: Any, hint: Any, *, strict: bool) -> Any: args = get_args(hint) if not args: return value adapter = TypeAdapter(hint) try: return adapter.validate_python(value) except Exception: pass last_error: Exception | None = None for arm in args: if arm is type(None) and value is None: return None try: return decode(value, arm, strict=False) except (QueuebridgeDecodeError, Exception) as exc: last_error = exc continue if strict: msg = f"cannot decode union value: {value!r}" if last_error: msg = f"{msg} (last error: {last_error})" raise QueuebridgeDecodeError(msg) return value
[docs] def decode_wire(value: Any) -> Any: """Recursively unwrap ``__qb__`` envelopes without type hints. Useful when you know the wire data contains tags but you do not have function annotations (e.g. Dramatiq message decode path). Args: value: Wire data (nested dicts/lists with optional envelopes). Returns: Python objects with all envelopes resolved. """ if is_qb_envelope(value): return _decode_envelope(value) if isinstance(value, dict): return {k: decode_wire(v) for k, v in value.items()} if isinstance(value, list): return [decode_wire(v) for v in value] return value
[docs] def decode(value: Any, hint: Any = Any, *, strict: bool = False) -> Any: """Recursively decode a wire value to Python using an optional type hint. Args: value: Wire data (primitives, containers, or ``__qb__`` envelopes). hint: Type annotation used for validation (e.g. ``OrderCreate``). strict: Raise ``QueuebridgeDecodeError`` when decoding fails. Returns: Decoded Python object. """ if value is None or isinstance(value, (bool, int, float, str)): return value if is_qb_envelope(value): return _decode_envelope(value) origin = _origin(hint) if origin is list: inner_args = get_args(hint) inner = inner_args[0] if inner_args else Any if isinstance(value, list): return [decode(item, inner, strict=strict) for item in value] if strict: raise QueuebridgeDecodeError(f"expected list, got {type(value)}") return value if origin is tuple: inner_args = get_args(hint) if isinstance(value, list): if inner_args and len(inner_args) == 2 and inner_args[1] is ...: return tuple(decode(item, inner_args[0], strict=strict) for item in value) if inner_args: return tuple( decode(item, inner_args[i] if i < len(inner_args) else Any, strict=strict) for i, item in enumerate(value) ) return tuple(value) if strict: raise QueuebridgeDecodeError(f"expected list for tuple hint, got {type(value)}") return value if origin is set: inner_args = get_args(hint) inner = inner_args[0] if inner_args else Any if isinstance(value, list): return {decode(item, inner, strict=strict) for item in value} if strict: raise QueuebridgeDecodeError(f"expected list for set hint, got {type(value)}") return value if origin is dict: args = get_args(hint) key_hint = args[0] if args else Any val_hint = args[1] if len(args) > 1 else Any if isinstance(value, dict): return { decode(k, key_hint, strict=strict): decode(v, val_hint, strict=strict) for k, v in value.items() } if strict: raise QueuebridgeDecodeError(f"expected dict, got {type(value)}") return value if origin is Union: return _decode_union(value, hint, strict=strict) if _is_base_model_type(hint) and isinstance(value, dict): return hint.model_validate(value) if hint is not Any: try: return TypeAdapter(hint).validate_python(value) except Exception as exc: if strict: raise QueuebridgeDecodeError(f"cannot decode value with hint {hint!r}: {exc}") from exc return value if strict: raise QueuebridgeDecodeError(f"cannot decode value without hint: {value!r}") return value