Salim Fadhley
Salim Fadhley

Reputation: 8245

Adding a callback to an asynchronous Celery task

I have a time-consuming celery task which produces a result. I'd like to execute a local callback (or any callback) as soon as that result is available.

My expectation was that I could asynchronously invoke the function which generates the data and then have a callback execute when that function has returned a result. For the purposes of this demo, I want to prove that it works by calling a trivial function like log.info - but in real life I might want to do something more complex like pass the data onto another function which will store the result.

Is there a simple way to do it? When I run my script I get an error that my task cannot be serialized. I get pretty much the same error if I substitute the celery task for a normal function.

stoneid_1  |   File "/usr/local/lib/python3.6/site-packages/kombu/utils/json.py", line 59, in default
stoneid_1  |     return super(JSONEncoder, self).default(o)
stoneid_1  |   File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
stoneid_1  |     o.__class__.__name__)
stoneid_1  | kombu.exceptions.EncodeError: Object of type 'log_some_information' is not JSON serializable

Here's my whole script:

import random
import time
import logging
import socket
from collections import namedtuple

log: logging.Logger = logging.getLogger()

from stoneid.celery.celery import app


TimeConsumingResult = namedtuple("TimeConsumingResult", ["word", "hostname"])


@app.task()
def time_consuming_thing()->TimeConsumingResult:
    delay:float = random.randrange(0,3)
    time.sleep(delay)

    items = ["zip", "zap", "zop"]
    return TimeConsumingResult(random.choice(items), socket.gethostname())


@app.task()
def log_some_information(info:TimeConsumingResult)->None:
    log.info("Message: %s, host: %s", info.word, info.hostname)


def main():
    while True:
        time.sleep(10)
        log.info("About to schedule a task:")
        time_consuming_thing.apply_async(link=log_some_information)


if __name__ == "__main__":
    logging.basicConfig()
    logging.getLogger("").setLevel(logging.INFO)
    main()

Upvotes: 0

Views: 1614

Answers (1)

2ps
2ps

Reputation: 15946

You need to use the signature() method when adding a link. The signature method can be abbreviated as s. So in your above code, use this:


df main():
    while True:
        time.sleep(10)
        log.info("About to schedule a task:")
        time_consuming_thing.apply_async(link=log_some_information.signature())

Upvotes: 0

Related Questions