Source code for queuebridge.dramatiq.encoder
from __future__ import annotations
import dramatiq
from dramatiq import JSONEncoder
from dramatiq.encoder import MessageData
from queuebridge.codec import decode_wire, encode
_installed = False
[docs]
class QueuebridgeEncoder(JSONEncoder):
"""Dramatiq encoder that runs queuebridge on message args/kwargs.
Metadata fields (``queue_name``, ``actor_name``, ``message_id``, etc.) are
left untouched. Only ``args`` and ``kwargs`` are encoded/decoded.
"""
[docs]
def encode(self, data: MessageData) -> bytes:
payload = dict(data)
for key in ("args", "kwargs"):
if key in payload:
payload[key] = encode(payload[key])
return super().encode(payload)
[docs]
def decode(self, data: bytes) -> MessageData:
raw = super().decode(data)
payload = dict(raw)
for key in ("args", "kwargs"):
if key in payload:
payload[key] = decode_wire(payload[key])
return payload
[docs]
def register_queuebridge(broker: dramatiq.Broker | None = None) -> None:
"""Install :class:`QueuebridgeEncoder` via ``dramatiq.set_encoder()``.
Call once at process startup, before actors send messages. Idempotent.
Args:
broker: Optional broker to set with ``dramatiq.set_broker()``.
Example::
import dramatiq
from queuebridge.dramatiq import register_queuebridge
register_queuebridge()
"""
global _installed
if _installed:
if broker is not None:
dramatiq.set_broker(broker)
return
dramatiq.set_encoder(QueuebridgeEncoder())
if broker is not None:
dramatiq.set_broker(broker)
_installed = True