Reputation: 17402
I've using feedparser
module to parse the RSS feeds. I need to pass the feedparser object to a celery task.
Upon trying to pass the object, I receive an error saying time.struct_time(tm_year=2015, tm_mon=2, tm_mday=12, tm_hour=8, tm_min=19, tm_sec=11, tm_wday=3, tm_yday=43, tm_isdst=0) is not JSON serializable
How do I pass the feedparser object to a celery task?
Here is my code:-
rss_content = feedparser.parse(rss_link)
content_entries = rss_content['entries']
for content in content_entries:
parse_link.apply_async(args=[content, link, json_id, news_category], queue= news_source) #celery task
How do I do it?
Upvotes: 1
Views: 1124
Reputation: 52203
You need to create your custom encoder and decoder that will basically convert your time.time_struct
object into something serializable (a dict), and then register them in the kombu serializer registry as described in the docs, in order to let celery use your new serializer in its task.
import json
import time
import types
import datetime
class FeedContentEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, time_struct):
epoch = int(time.mktime(time_struct))
return {'__type__': '__time__', 'time': epoch}
else:
return json.FeedContentEncoder.default(self, obj)
def decode_feed_content(obj):
if isinstance(obj, types.DictionaryType) and '__type__' in obj:
if obj['__type__'] == '__time__':
return datetime.datetime.fromtimestamp(obj['time']).timetuple()
return obj
You need to notify kombu about your new serialization by registering them into the serializer registry.
from kombu.serialization import register
def feed_content_json_dumps(obj):
return json.dumps(obj, cls=FeedContentEncoder)
def feed_content_json_loads(obj):
return json.loads(obj, object_hook=decode_feed_content)
register('feedcontentjson',
feed_content_json_dumps,
feed_content_json_loads,
content_type='application/x-feedcontent-json',
content_encoding='utf-8')
Finally, you should tell celery to use the new serializer for serializing the task just like the celery docs; you should call your task with the serializer
parameter.
parse_link.apply_async(args=[content, link, json_id, news_category], queue= news_source, serializer='feedcontentjson')
Hope this helps.
Upvotes: 1