Reputation: 41
Suppose all my tasks on a celery queue are hitting a 3rd party API. However, the API has a rate limit, which I am keeping track of (there is a day limit and hourly limit which I need to respect). As soon as I hit the rate limit, I want to pause consumption of new tasks, and then resume when I know I am good.
I achieved this by using the following two tasks:
@celery.task()
def cancel_api_queue(minutes_to_resume):
resume_api_queue.apply_async(countdown=minutes_to_resume*60, queue='celery')
celery.control.cancel_consumer('third_party', reply=True)
@celery.task(default_retry_delay=300, max_retries=5)
def resume_api_queue():
celery.control.add_consumer('third_party', destination=['y@local'])
Then I can keep submitting my 3rd party API tasks, and as soon as my consumer is added back, all my tasks get consumed. Great.
However, since I have no consumer on this queue, this seems to mean I cannot see the jobs that are being submitted in Flower any more (until my consumer gets added).
Is there something I am doing wrong? Can I achieve this 'pause' another way to allow me to continue to see submitted jobs in flower?
p.s. maybe this is related to this issue, but not 100% sure: https://github.com/celery/celery/issues/1452
I am using amqp broker if that makes a difference.
thanks girls and boys.
Upvotes: 4
Views: 2087
Reputation: 11430
I'd suspect that peeking into contents of the queue messages before a worker picks them up is not really part of Flower's intended design. Therefore, if you stop consuming tasks from a queue, the best Flower can do is show you how many of them have been enqueued as a single number on the "Broker" pane.
One hackish way to observe the internals of the incoming tasks could be to add an intermediate dummy "forwarding" task, which simply forwards the message from one queue (let us call it query_inbox
) to another (say, query_processing
).
E.g. something like:
@celery.task(queue='query_inbox')
def query(params):
process_query.delay(params)
@celery.task(queue='query_processing')
def process_query(params):
... do rate-limited stuff ...
Now you may stop consuming tasks from query_processing
, but you will still be able to observe their parameters as they flow through the query_inbox
worker.
Upvotes: 2