Reputation: 791
I'm trying to offload POST request calls to our API to Celery, as we're going to be sending up to 10 requests per second to our API soon, each of which will have over 100 objects to create in our DB. I figured I'd add them to a queue and let Redis + Celery handle that then work from there.
I'm running into a few issues though.
First, my celery settings:
########## CELERY
import djcelery
djcelery.setup_loader()
INSTALLED_APPS += ['expert.taskapp.celery.CeleryConfig']
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://127.0.0.1:6379')
if CELERY_BROKER_URL == 'django://':
CELERY_RESULT_BACKEND = 'redis://'
else:
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
########## END CELERY
Using class basec views for my Django REST Framework, this is my view so far:
from celery import shared_task
from celery.decorators import task
from .tasks import create
class DataCreateAPIView(CreateAPIView):
def create(self, request, *args, **kwargs):
create.delay(request)
So the idea is to let the create view handle everythng up until I get to the create portion of the process, where I immediately offload the create task to Celery.
In my tasks.py file I then have this:
from celery import shared_task
from celery.decorators import task
from expert.models import Chamber, Parameter, Sensor, Data
@task(name='POST request Data point.')
def create(self, request, *args, **kwargs):
queryset = Data.objects.all()
queryset = DataCreateSerializer.setup_eager_loading(queryset)
# serializer_class = DataCreateSerializer
try:
sensor = Sensor.objects.get(serial_number=request.data["data_source"])
request.data["data_source"] = sensor.id
except Sensor.DoesNotExist:
print("Sensor serial number " + str(request.data["data_source"]) + " not registered.")
return Response(status=status.HTTP_404_NOT_FOUND)
dataDict = dict(request.data)
for param in dataDict['parameters']:
Parameter.objects.get_or_create(parameter_name=param, parameter_position="None")
final_data = []
for data in dataDict['data_array']:
zipped = zip(dataDict['parameters'], data['values'])
for parameter, value in zipped:
# parameter = Parameter.objects.get_or_create(parameter_name=parameter, parameter_position="None")[0]
parameter = Parameter.objects.get(parameter_name=parameter)
final_data.append({
"sensor": sensor.id,
"parameter": parameter.id,
"time": data['time'],
"parameter_value": value
})
serializer = DataCreateSerializer(data=final_data, many=True)
if serializer.is_valid():
serializer.save()
return Response(status=status.HTTP_200_OK)
return Response(serializer.errors)
Where I take in the large request, modify it a bit to fit our schema, then do the DB writing stuff.
Now, this all works if I just take that very same "create" function and leave it straight on the CreateAPIView.
When I try to do it with Celery, I get the task showing in my task list when I initialize the celery workers, but I can't manage to get the request to reach celery. I breaks before then with the following error:
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/django/core/handlers/exception.py" in inner
42. response = get_response(request)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/django/core/handlers/base.py" in _get_response
187. response = self.process_exception_by_middleware(e, request)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/django/core/handlers/base.py" in _get_response
185. response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "/usr/lib/python3.5/contextlib.py" in inner
30. return func(*args, **kwds)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/django/views/decorators/csrf.py" in wrapped_view
58. return view_func(*args, **kwargs)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/django/views/generic/base.py" in view
68. return self.dispatch(request, *args, **kwargs)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/rest_framework/views.py" in dispatch
489. response = self.handle_exception(exc)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/rest_framework/views.py" in handle_exception
449. self.raise_uncaught_exception(exc)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/rest_framework/views.py" in dispatch
486. response = handler(request, *args, **kwargs)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/rest_framework/generics.py" in post
192. return self.create(request, *args, **kwargs)
File "/home/luke/Projects/expert/impedans_expert/impedans_expert/expert/api/views.py" in create
249. create.delay(request)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/celery/app/task.py" in delay
461. return self.apply_async(args, kwargs)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/celery/app/task.py" in apply_async
573. **dict(self._get_exec_options(), **options)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/celery/app/base.py" in send_task
354. reply_to=reply_to or self.oid, **options
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/celery/app/amqp.py" in publish_task
310. **kwargs
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/messaging.py" in publish
165. compression, headers)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/messaging.py" in _prepare
241. body) = dumps(body, serializer=serializer)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/serialization.py" in dumps
164. payload = encoder(data)
File "/usr/lib/python3.5/contextlib.py" in __exit__
77. self.gen.throw(type, value, traceback)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/serialization.py" in _reraise_errors
59. reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/five.py" in reraise
131. raise value.with_traceback(tb)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/serialization.py" in _reraise_errors
55. yield
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/serialization.py" in dumps
164. payload = encoder(data)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/serialization.py" in pickle_dumps
356. return dumper(obj, protocol=pickle_protocol)
Exception Type: EncodeError at /expert/api/data/create/
Exception Value: cannot serialize '_io.BufferedReader' object
I honestly have to clue how to go from there. I've tried googling the exception but even though I saw one page where stuff looked similar-ish, I couldn't make sense of the thread.
I would greatly appreciate any help in this matter.
Upvotes: 1
Views: 7016
Reputation: 556
Function-based celery tasks require that the arguments passed to it are serializable. You're getting an error that's implying that there's some argument that is not serializable.
First you need to fix your create
function signature, as it should not have self
as an argument:
@task(name='POST request Data point.')
def create(request):
...
Let me know if that works, as I'm looking into a similar solution.
Upvotes: 3
Reputation: 935
It's hard to tell without being able to debug, but in any case I would start by changing DataCreateAPIView.create to another name, since it may cause conflicts with the method of the same name in tasks.py
Upvotes: -2