pythonberg
pythonberg

Reputation: 31

Override result serializer celery chord

I am using Celery chords to structure parallel AI processing of large document page content. Because this is a single use function with no public signature, I am pickling the objects to distribute and reaggregate. The task to process a single page is successfully reading the arguments and performing needed function. It fails however trying to return results to queue for subsequent aggregation.

Does anyone know of a way to specify a result_serializer for a single task called via Chord?

chord generation---

callback = processPageResults.subtask(kwargs={'cdd_id' : cdoc.cdd_id,'user_id':user.id},options={'serializer':'pickle'})

res = chord([processPage.s(useBold,
       docPages[i]).set(serializer='pickle') for i in range(0, len(docPages))], callback)()

called task --- @shared_task(serializer='pickle',result_serializer='pickle',bind=True, max_retries=20) def processPage(self, *args): useBold = args[0] page= args[1] page.processWords(useBold) return page

error ---

kombu.exceptions.EncodeError: Object of type DocumentPage is not JSON serializable

Upvotes: 1

Views: 1730

Answers (2)

pythonberg
pythonberg

Reputation: 31

After revisiting this, I have a solution for my use case. The result_serializer for me was the wrong concept. Because celery will automatically serialize arguments passed to task, I solved my problem simply by setting the task_serializer for the callback to 'pickle'. It appears that for a set of chained tasks, result_serializer isn't really useful

Upvotes: 0

Durai
Durai

Reputation: 525

Ideally you can set result_serializer in the signature which works for normal task.

In case of chord, it is not working well for me. But on the other hand, you can update celery global configuration and it will work.

I am trying to report this is as bug. Lets see.

celery = Celery("app_name", backend=result_backend, broker=broker_url)

celery.conf.update(
    result_serializer='pickle',        
)

Upvotes: 4

Related Questions