bigblind
bigblind

Reputation: 12867

Weird error with Redis and Celery

I'm getting the following error in one of my Celery workers:

2015-07-21T15:02:04.010066+00:00 app[worker.1]: Traceback (most recent call last):
2015-07-21T15:02:04.010069+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/celery/app/trace.py", line 296, in trace_task
2015-07-21T15:02:04.010070+00:00 app[worker.1]:     on_chord_part_return(task, state, R)
2015-07-21T15:02:04.010073+00:00 app[worker.1]:     deps.delete()
2015-07-21T15:02:04.010074+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/celery/result.py", line 773, in delete
2015-07-21T15:02:04.010071+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 587, in on_chord_part_return
2015-07-21T15:02:04.010078+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 329, in delete_group
2015-07-21T15:02:04.010076+00:00 app[worker.1]:     (backend or self.app.backend).delete_group(self.id)
2015-07-21T15:02:04.010079+00:00 app[worker.1]:     return self._delete_group(group_id)
2015-07-21T15:02:04.010081+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 499, in _delete_group
2015-07-21T15:02:04.010082+00:00 app[worker.1]:     self.delete(self.get_key_for_group(group_id))
2015-07-21T15:02:04.010083+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/redis.py", line 172, in delete
2015-07-21T15:02:04.010084+00:00 app[worker.1]:     self.client.delete(key)
2015-07-21T15:02:04.010085+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 824, in delete
2015-07-21T15:02:04.010087+00:00 app[worker.1]:     return self.execute_command('DEL', *names)
2015-07-21T15:02:04.010088+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 565, in execute_command
2015-07-21T15:02:04.010089+00:00 app[worker.1]:     return self.parse_response(connection, command_name, **options)
2015-07-21T15:02:04.010090+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 579, in parse_response
2015-07-21T15:02:04.010091+00:00 app[worker.1]:     return self.response_callbacks[command_name](response, **options)
2015-07-21T15:02:04.010093+00:00 app[worker.1]: ValueError: invalid literal for int() with base 10: 'QUEUED'

What I find weird is that I see no call to int in the last line of the stack trace. QUEUED probably came in as a worker's status. I'm using it as a custom worker status like this:

@before_task_publish.connect
def update_sent_state(sender=None, body=None, **kwargs):
    # the task may not exist if sent using `send_task` which
    # sends tasks by name, so fall back to the default result backend
    # if that is the case.
    task = current_app.tasks.get(sender)
    backend = task.backend if task else current_app.backend
    logging.debug("Setting status for %s" % body["id"])

    backend.store_result(body['id'], None, "QUEUED")

What could be the issue here?


In case it's relevant, here's the code for my task. I only call fetch directly is fetch.

@app.task
def fetch(url_or_urls, subscribe=None):
    """This fetches a (list of) podcast(s) and stores it in the db. It assumes that it only gets called
    by Podcast.get_by_url, or some other method that knows whether a given podcast has
    already been fetched.

    If *subscribe* is given, it should be a User instance to be subscribed to the given podcasts."""
    if isinstance(url_or_urls, basestring):
        url_or_urls = [url_or_urls]
    body = _store_podcasts.s()
    if subscribe:
        body.link(_subscribe_user.s(user=subscribe))
    return chord([_fetch_podcast_data.s(url) for url in url_or_urls])(body)

@app.task
def _fetch_podcast_data(url):
    return do_fetch(url) # This function returns a dict of podcast data.

@app.task
def _store_podcasts(podcasts_data):
    """Given a list of dictionaries representing podcasts, store them all in the database."""
    podcasts = [Podcast(**pdata) for pdata in podcasts_data]
    return Podcast.objects.insert(podcasts)

@app.task
def _subscribe_user(podcasts, user):
    """Subscribe the given users to all the podcasts in the list."""
    return user.subscribe_multi(podcasts)

Is there anything else that could be relevant here?


Library versions as shown by pip freeze:

redis==2.10.3
celery==3.1.18

Upvotes: 7

Views: 2118

Answers (3)

Cockcrow
Cockcrow

Reputation: 11

I got the same error these days. And founded my QUEUED response comes from redis MULTI commands. See https://redis.io/topics/transactions#usage.

It maybe that your are reading response from wrong connection. Maybe in multi-processing / multi-threading / eventlet, etc. Not sure.

Upvotes: 0

singer
singer

Reputation: 2636

It is hard to debug such a bug without working code. Here is what i think it could be. Lets start here:

http://celery.readthedocs.org/en/latest/_modules/celery/backends/base.html#BaseBackend.store_result

def store_result(self, task_id, result, status,
                 traceback=None, request=None, **kwargs):
    """Update task state and result."""
    result = self.encode_result(result, status)
    self._store_result(task_id, result, status, traceback,
                       request=request, **kwargs)
    return result

It calls ecnode_result. Lets check that out

  def encode_result(self, result, status):
        if status in self.EXCEPTION_STATES and isinstance(result, Exception):
            return self.prepare_exception(result)
        else:
            return self.prepare_value(result)

It looks like "status" is expected to be something from predefined STATE constants.

Its code is here

http://celery.readthedocs.org/en/latest/_modules/celery/states.html#state

And docs here

http://celery.readthedocs.org/en/latest/reference/celery.states.html

That does not look like they expect to see something like "QUEUED" there. Try one of the predefined.

Upvotes: 3

garnertb
garnertb

Reputation: 9584

The redis python packages expects the response from the DEL action to always be an integer, which I assume is the count of deleted rows.

The call to int happens in the last line (return self.response_callbacks[command_name](response, **options)) where self.response_callbacks['DEL'] is equal to int.

As a workaround, you could subclass the redis.client.StrictRedis and set the DEL response callback to something other than int, just make sure you're familiar with the implications.

Upvotes: 2

Related Questions