Source code for queuebridge.arq.install

from __future__ import annotations

import functools
from collections.abc import Awaitable, Callable
from typing import Any, TypeVar, cast

from queuebridge.arq.serializer import deserialize, serialize
from queuebridge.codec import decode
from queuebridge.hints import decode_args

T = TypeVar("T")
F = TypeVar("F", bound=Callable[..., Awaitable[Any]])


[docs] def get_serializer_pair() -> tuple[Callable[[dict[str, Any]], bytes], Callable[[bytes], dict[str, Any]]]: """Return ``(serialize, deserialize)`` for Arq ``WorkerSettings`` and ``create_pool``. Both the worker and the client **must** use the same pair. Returns: Tuple of callables compatible with Arq's ``job_serializer`` / ``job_deserializer``. Example:: serialize, deserialize = get_serializer_pair() class WorkerSettings: job_serializer = serialize job_deserializer = deserialize """ return serialize, deserialize
[docs] def qb_task(fn: F) -> F: """Decode wire args/kwargs before the wrapped async Arq task runs. Apply **outside** ``@validate_call``:: @qb_task @validate_call async def process(ctx, order: OrderCreate): ... Args: fn: Async task function registered with Arq. Returns: Wrapped coroutine with decoded arguments. """ @functools.wraps(fn) async def wrapper(*args: Any, **kwargs: Any) -> Any: decoded_args, decoded_kwargs = decode_args(fn, args, kwargs) return await fn(*decoded_args, **decoded_kwargs) return wrapper # type: ignore[return-value]
[docs] async def typed_result(job: Any, return_type: type[T]) -> T: """Await ``job.result()`` and decode into a Pydantic model. Args: job: Arq :class:`~arq.jobs.Job` instance. return_type: Expected result type. Returns: Decoded result (e.g. ``OrderResult``). Example:: job = await pool.enqueue_job("process_order", order=data) result = await typed_result(job, OrderResult) """ raw = await job.result() return cast(T, decode(raw, return_type))