kviktor
kviktor

Reputation: 1088

Celery throwing BacklogLimitExceeded

I start a task that updates it's state periodically and watch the result however after the second cycle on the calling side the program throws a BacklogLimitExceeded exception (the task itself finishes successfully after a while)

calling side:

      task = signature("worker.taskname", args=(url, ), queue="worker")
      g = group(task).apply_async() 
      while not g.ready():
          print(g[0].result)
          time.sleep(5)

task side:

 with open(filename, "wb") as w:
     fd = stream.open()
     while True:
         data = fd.read(2048)
         if data:
             w.write(data)
             size = w.tell()
             # taskname.update_state(meta={'size': size})
         else:
             break

(if I comment that line out everything works fine)

I'm on Ubuntu 14.04 and using RabbitMQ as a broker and as a backend too. Any idea how to fix this?

Here is the exact stracktrace

Traceback (most recent call last):
  File "main.py", line 55, in <module>
    while not g.ready():
  File "python3.4/site-packages/celery/result.py", line 503, in ready
    return all(result.ready() for result in self.results)
  File "python3.4/site-packages/celery/result.py", line 503, in <genexpr>
    return all(result.ready() for result in self.results)
  File "python3.4/site-packages/celery/result.py", line 259, in ready
    return self.state in self.backend.READY_STATES
  File "python3.4/site-packages/celery/result.py", line 394, in state
    return self._get_task_meta()['status']
  File "python3.4/site-packages/celery/result.py", line 339, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "python3.4/site-packages/celery/backends/amqp.py", line 180, in get_task_meta
    raise self.BacklogLimitExceeded(task_id)
celery.backends.amqp.BacklogLimitExceeded: 0a4fb653-0f05-48dc-ac43-fb0c8fbaba9a

Upvotes: 6

Views: 1353

Answers (2)

hepcat72
hepcat72

Reputation: 1124

For any other noobs finding themselves here like me and not finding the "limit" to increase in the settings (maybe it went away in later versions? - I don't know...). I solved this problem by making sure I make under 1000 status updates.

You could calculate a percentage only issue a status update when it ticks up by 1:

last_prcnt = 0
for i in range(10000):
    prcnt = int(i/10000*100)
    do_work()
    if last_prcnt != prcnt:
        last_prcnt = prcnt
        self.update_state(state='PENDING', meta={'current': prcnt, 'total': 100})

I found that this prevented me from hitting the BacklogLimitExceeded exception. Just put that percentage inside your task's code.

I'm sure someone else could provide a slicker answer, but once I understood what was actually being exceeded, which seemed somewhat abstract to me (what's a "message"? - it's just calls to update_state), the solution was pretty straight forward.

Upvotes: 1

kwelsh
kwelsh

Reputation: 106

I recently recieved this error with Redis as a backend and dug into it a bit more. The error is due to there being more than 1000 messages on the backend, when the loop hits this default limit you get this error.

There are some knobs that might be helpful, result_expires being one of them. You can also increase the limit above 1000.

http://docs.celeryproject.org/en/latest/userguide/configuration.html#redis-backend-settings

Upvotes: 2

Related Questions