Davuz
Davuz

Reputation: 5276

How to make sure a Celery task was enqueued in Pytest?

I have an API endpoint to register new user. The "welcome email" will enqueue and do this task async. I have 2 unit tests to check:

  1. Api does save user's information to DB OK
  2. The Celery task does send email with right content+template

I want to add 3rd unit test to ensure "The endpoint has to enqueue email-sending after saving user form to DB"

I try with celery.AsyncResult but it ask me to run the worker. For further, even if the worker is ready, we still can't verify the task was enqueued or not because the ambiguous PENDING state:

  1. Task exists in queue but not execute yet: PENDING
  2. Task doesn't exist in queue: PENDING

Does anyone face this problem? How do I solve it?

Upvotes: 3

Views: 1487

Answers (1)

DejanLekic
DejanLekic

Reputation: 19787

Common way to solve this problem in testing environments is to use the task_always_eager configuration setting, which basically instructs Celery to run the task like a regular function. Instead of the AsyncResult, Celery will make an object of the EagerResult type that behaves the same, but has completely different execution logic.

Upvotes: 1

Related Questions