Source code for queuebridge.arq.serializer
from __future__ import annotations
from typing import Any
import msgpack
from queuebridge.codec import encode
[docs]
def serialize(d: dict[str, Any]) -> bytes:
"""Arq job serializer: msgpack over queuebridge-encoded dict.
Args:
d: Arq job dict (``t``, ``f``, ``a``, ``k``, ``et``, ...).
Returns:
msgpack bytes safe for Redis storage.
"""
packed: bytes = msgpack.packb(encode(d), use_bin_type=True)
return packed
[docs]
def deserialize(b: bytes) -> dict[str, Any]:
"""Arq job deserializer: unpack msgpack to wire dict.
Args:
b: Bytes from the Arq queue.
Returns:
Job dict still in wire format; use ``@qb_task`` to decode at task boundary.
"""
result = msgpack.unpackb(b, raw=False)
if not isinstance(result, dict):
raise TypeError(f"expected dict from deserializer, got {type(result)}")
return result