Reputation: 8245
I have a time-consuming celery task which produces a result. I'd like to execute a local callback (or any callback) as soon as that result is available.
My expectation was that I could asynchronously invoke the function which generates the data and then have a callback execute when that function has returned a result. For the purposes of this demo, I want to prove that it works by calling a trivial function like log.info
- but in real life I might want to do something more complex like pass the data onto another function which will store the result.
Is there a simple way to do it? When I run my script I get an error that my task cannot be serialized. I get pretty much the same error if I substitute the celery task for a normal function.
stoneid_1 | File "/usr/local/lib/python3.6/site-packages/kombu/utils/json.py", line 59, in default
stoneid_1 | return super(JSONEncoder, self).default(o)
stoneid_1 | File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
stoneid_1 | o.__class__.__name__)
stoneid_1 | kombu.exceptions.EncodeError: Object of type 'log_some_information' is not JSON serializable
Here's my whole script:
import random
import time
import logging
import socket
from collections import namedtuple
log: logging.Logger = logging.getLogger()
from stoneid.celery.celery import app
TimeConsumingResult = namedtuple("TimeConsumingResult", ["word", "hostname"])
@app.task()
def time_consuming_thing()->TimeConsumingResult:
delay:float = random.randrange(0,3)
time.sleep(delay)
items = ["zip", "zap", "zop"]
return TimeConsumingResult(random.choice(items), socket.gethostname())
@app.task()
def log_some_information(info:TimeConsumingResult)->None:
log.info("Message: %s, host: %s", info.word, info.hostname)
def main():
while True:
time.sleep(10)
log.info("About to schedule a task:")
time_consuming_thing.apply_async(link=log_some_information)
if __name__ == "__main__":
logging.basicConfig()
logging.getLogger("").setLevel(logging.INFO)
main()
Upvotes: 0
Views: 1614
Reputation: 15946
You need to use the signature()
method when adding a link
. The signature
method can be abbreviated as s
. So in your above code, use this:
df main():
while True:
time.sleep(10)
log.info("About to schedule a task:")
time_consuming_thing.apply_async(link=log_some_information.signature())
Upvotes: 0