Source code for queuebridge.celery.serializer
from __future__ import annotations
import json
from typing import Any
from queuebridge.codec import encode
[docs]
def dumps(obj: Any) -> str:
"""Kombu encoder: JSON-serialize a Celery message body.
Uses ``encode(..., tag_models=False)`` so Celery ``pydantic=True`` can
validate plain dicts on the worker.
Args:
obj: Celery/kombu message dict (``task``, ``args``, ``kwargs``, ...).
Returns:
JSON string.
"""
return json.dumps(encode(obj, tag_models=False))
[docs]
def loads(data: str) -> Any:
"""Kombu decoder: parse JSON message body.
Returns wire dicts; worker-side ``pydantic=True`` validates models.
Args:
data: JSON string from the broker.
Returns:
Parsed message dict.
"""
return json.loads(data)