Source code for queuebridge.celery.result
from __future__ import annotations
from typing import Any, Generic, TypeVar, cast
from queuebridge.codec import decode
T = TypeVar("T")
[docs]
class TypedAsyncResult(Generic[T]):
"""Wraps Celery :class:`~celery.result.AsyncResult` with typed ``get()``.
Proxies all other attributes (``.id``, ``.state``, ``.ready()``, etc.) to
the underlying async result.
Args:
async_result: Celery async result from ``task.delay()`` or ``apply_async()``.
return_type: Pydantic model or type hint for the task return value.
Example::
ar = process_order.delay(OrderCreate(id=1, sku="X"))
result = TypedAsyncResult(ar, OrderResult).get(timeout=10)
"""
[docs]
def __init__(self, async_result: Any, return_type: type[T]) -> None:
self._ar = async_result
self._return_type = return_type
[docs]
def get(self, *args: Any, **kwargs: Any) -> T:
"""Block until ready and decode the result to ``return_type``."""
raw = self._ar.get(*args, **kwargs)
return cast(T, decode(raw, self._return_type))
def __getattr__(self, name: str) -> Any:
return getattr(self._ar, name)
[docs]
def typed_result(async_result: Any, return_type: type[T]) -> TypedAsyncResult[T]:
"""Create a :class:`TypedAsyncResult` for client-side typed ``get()``.
Args:
async_result: Celery async result.
return_type: Expected return type (usually a Pydantic model).
Returns:
Wrapper whose ``get()`` returns ``return_type`` instead of ``dict``.
"""
return TypedAsyncResult(async_result, return_type)