Reputation: 127
I got the following code:
CeleryTask: TypeAlias = "Task[Any, Any]"
@celery.task(
bind=True,
...
)
def handle_webhook(self: CeleryTask, webhook: Webhook) -> None:
# do stuff
Then I have a dictionary like this:
webhook_map: dict[str, Callable[[Task, 'Webhook'], None]] = {
"order.webhook": handle_webhook
}
I also tried different ways, like this:
webhook_map: dict[str, (CeleryTask, Webhook)] = {
"order.webhook": handle_webhook
}
webhook_map: dict[str, CeleryTask] = {
"order.webhook": handle_webhook
}
But none of this works. When I later fetch the value
from the dict
, MyPy is complaining that delay
method does not exist on the task.
How can I properly save the CeleryTasks in my dict
and call delay
to schedule them later? Is this possible in Python? Thanks!
Upvotes: 0
Views: 42