Reputation: 31
I am using Celery chords to structure parallel AI processing of large document page content. Because this is a single use function with no public signature, I am pickling the objects to distribute and reaggregate. The task to process a single page is successfully reading the arguments and performing needed function. It fails however trying to return results to queue for subsequent aggregation.
Does anyone know of a way to specify a result_serializer for a single task called via Chord?
chord generation---
callback = processPageResults.subtask(kwargs={'cdd_id' : cdoc.cdd_id,'user_id':user.id},options={'serializer':'pickle'})
res = chord([processPage.s(useBold,
docPages[i]).set(serializer='pickle') for i in range(0, len(docPages))], callback)()
called task --- @shared_task(serializer='pickle',result_serializer='pickle',bind=True, max_retries=20) def processPage(self, *args): useBold = args[0] page= args[1] page.processWords(useBold) return page
error ---
kombu.exceptions.EncodeError: Object of type DocumentPage is not JSON serializable
Upvotes: 1
Views: 1730
Reputation: 31
After revisiting this, I have a solution for my use case. The result_serializer for me was the wrong concept. Because celery will automatically serialize arguments passed to task, I solved my problem simply by setting the task_serializer for the callback to 'pickle'. It appears that for a set of chained tasks, result_serializer isn't really useful
Upvotes: 0
Reputation: 525
Ideally you can set result_serializer in the signature which works for normal task.
In case of chord, it is not working well for me. But on the other hand, you can update celery global configuration and it will work.
I am trying to report this is as bug. Lets see.
celery = Celery("app_name", backend=result_backend, broker=broker_url)
celery.conf.update(
result_serializer='pickle',
)
Upvotes: 4