Praful Bagai
Praful Bagai

Reputation: 17402

Python- How to pass Feedparser object to a celery task?

I've using feedparser module to parse the RSS feeds. I need to pass the feedparser object to a celery task.

Upon trying to pass the object, I receive an error saying time.struct_time(tm_year=2015, tm_mon=2, tm_mday=12, tm_hour=8, tm_min=19, tm_sec=11, tm_wday=3, tm_yday=43, tm_isdst=0) is not JSON serializable

How do I pass the feedparser object to a celery task?

Here is my code:-

rss_content = feedparser.parse(rss_link)
content_entries = rss_content['entries']
for content in content_entries:
    parse_link.apply_async(args=[content, link, json_id, news_category], queue= news_source) #celery task

How do I do it?

Upvotes: 1

Views: 1124

Answers (1)

Ozgur Vatansever
Ozgur Vatansever

Reputation: 52203

You need to create your custom encoder and decoder that will basically convert your time.time_struct object into something serializable (a dict), and then register them in the kombu serializer registry as described in the docs, in order to let celery use your new serializer in its task.

import json
import time
import types
import datetime

class FeedContentEncoder(json.JSONEncoder):   
    def default(self, obj):
        if isinstance(obj, time_struct):
            epoch = int(time.mktime(time_struct))
            return {'__type__': '__time__', 'time': epoch}
        else:
            return json.FeedContentEncoder.default(self, obj)

def decode_feed_content(obj):
    if isinstance(obj, types.DictionaryType) and '__type__' in obj:
        if obj['__type__'] == '__time__':
            return datetime.datetime.fromtimestamp(obj['time']).timetuple()
    return obj

You need to notify kombu about your new serialization by registering them into the serializer registry.

from kombu.serialization import register

def feed_content_json_dumps(obj):
    return json.dumps(obj, cls=FeedContentEncoder)

def feed_content_json_loads(obj):
    return json.loads(obj, object_hook=decode_feed_content)

register('feedcontentjson', 
         feed_content_json_dumps, 
         feed_content_json_loads, 
         content_type='application/x-feedcontent-json', 
         content_encoding='utf-8') 

Finally, you should tell celery to use the new serializer for serializing the task just like the celery docs; you should call your task with the serializer parameter.

parse_link.apply_async(args=[content, link, json_id, news_category], queue= news_source, serializer='feedcontentjson')

Hope this helps.

Upvotes: 1

Related Questions