Source code for queuebridge.celery.install
from __future__ import annotations
from typing import TYPE_CHECKING
from kombu.serialization import register
from queuebridge.celery.serializer import dumps, loads
if TYPE_CHECKING:
from celery import Celery
[docs]
def register_queuebridge(app: Celery, *, strict: bool = False) -> None:
"""Register the ``queuebridge-json`` serializer on a Celery application.
Configures ``task_serializer``, ``result_serializer``, and ``accept_content``.
Safe to call multiple times (idempotent).
Args:
app: Celery application instance (worker and producer must both call this).
strict: Reserved for future strict decode behavior on the client.
Example::
from celery import Celery
from queuebridge.celery import register_queuebridge
app = Celery("myapp", broker="redis://localhost:6379/0")
register_queuebridge(app)
"""
if getattr(app, "_queuebridge_installed", False):
return
register(
"queuebridge-json",
dumps,
loads,
content_type="application/json",
content_encoding="utf-8",
)
app.conf.update(
task_serializer="queuebridge-json",
result_serializer="queuebridge-json",
accept_content=["json", "queuebridge-json"],
result_accept_content=["json", "queuebridge-json"],
)
app._queuebridge_installed = True
app._queuebridge_strict = strict