Reputation: 14022
I have some objects I want to send to celery tasks on my application. Those objects are obviously not json serializable using the default json library. Is there a way to make celery serialize/de-serialize those objects with custom JSON Encoder
/Decoder
?
Upvotes: 49
Views: 21502
Reputation: 397
Today, there is an easier way to do this job with kombu register_type
.
Here is an example of implementation for dataclass serialization/deserialization.
from functools import partial
from dataclasses import asdict
from kombu.utils.json import register_type
def class_full_name(clz: type) -> str:
return ".".join([clz.__module__, clz.__qualname__])
def _encoder(obj) -> dict:
return asdict(obj)
def _decoder(clz: type, data: dict):
return clz(**data)
def register_kombu_type(model):
register_type(
model,
class_full_name(model),
encoder=_encoder,
decoder=partial(_decoder, model),
)
Inspired by zeroohub's solution for Pydantic. Ref
Upvotes: 0
Reputation: 3527
A bit late here, but you should be able to define a custom encoder and decoder by registering them in the kombu serializer registry, as in the docs: http://docs.celeryproject.org/en/latest/userguide/calling.html#serializers.
For example, the following is a custom datetime serializer/deserializer (subclassing python's builtin json module) for Django:
myjson.py (put it in the same folder of your settings.py file)
import json
from datetime import datetime
from time import mktime
class MyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return {
'__type__': '__datetime__',
'epoch': int(mktime(obj.timetuple()))
}
else:
return json.JSONEncoder.default(self, obj)
def my_decoder(obj):
if '__type__' in obj:
if obj['__type__'] == '__datetime__':
return datetime.fromtimestamp(obj['epoch'])
return obj
# Encoder function
def my_dumps(obj):
return json.dumps(obj, cls=MyEncoder)
# Decoder function
def my_loads(obj):
return json.loads(obj, object_hook=my_decoder)
settings.py
# Register your new serializer methods into kombu
from kombu.serialization import register
from .myjson import my_dumps, my_loads
register('myjson', my_dumps, my_loads,
content_type='application/x-myjson',
content_encoding='utf-8')
# Tell celery to use your new serializer:
CELERY_ACCEPT_CONTENT = ['myjson']
CELERY_TASK_SERIALIZER = 'myjson'
CELERY_RESULT_SERIALIZER = 'myjson'
Upvotes: 77