Reputation: 1380
I have written a Celery Task class like this:
myapp.tasks.py
from __future__ import absolute_import, unicode_literals
from .services.celery import app
from .services.command_service import CommandService
from exceptions.exceptions import *
from .models import Command
class CustomTask(app.Task):
def run(self, json_string, method_name, cmd_id: int):
command_obj = Command.objects.get(id=cmd_id) # type: Command
try:
val = eval('CommandService.{}(json_string={})'.format(method_name, json_string))
status, error = 200, None
except Exception as e:
auto_retry = command_obj.auto_retry
if auto_retry and isinstance(e, CustomError):
command_obj.retry_count += 1
command_obj.save()
return self.retry(countdown=CustomTask._backoff(command_obj.retry_count), exc=e)
elif auto_retry and isinstance(e, AnotherCustomError) and command_obj.retry_count == 0:
command_obj.retry_count += 1
command_obj.save()
print("RETRYING NOW FOR DEVICE CONNECTION ERROR. TRANSACTION: {} || IP: {}".format(command_obj.transaction_id,
command_obj.device_ip))
return self.retry(countdown=command_obj.retry_count*2, exc=e)
val = None
status, error = self._find_status_code(e)
return_dict = {"error": error, "status_code": status, "result": val}
return return_dict
@staticmethod
def _backoff(attempts):
return 2 ** attempts
@staticmethod
def _find_status_code(exception):
if isinstance(exception, APIException):
detail = exception.default_detail if exception.detail is None else exception.detail
return exception.status_code, detail
return 500, CustomTask._get_generic_exc_msg(exception)
@staticmethod
def _get_generic_exc_msg(exc: Exception):
s = ""
try:
for msg in exc.args:
s += msg + ". "
except Exception:
s = str(exc)
return s
CustomTask = app.register_task(CustomTask())
The Celery App definition:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, Task
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
_celery_broker = settings.CELERY_BROKER <-- my broker is amqp://username:password@localhost:5672/myhost
app = Celery('myapp', broker=_celery_broker, backend='rpc://', include=['myapp.tasks', 'myapp.controllers'])
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(['myapp'])
app.conf.update(
result_expires=4800,
task_acks_late=True
)
my init.py the tutorial recommended:
from .celery import app as celery_app
__all__ = ['celery_app']
The controller that is running the task:
from __future__ import absolute_import, unicode_literals
from .services.log_service import LogRunner
from myapp.services.command_service import CommandService
from exceptions.exceptions import *
from myapp.services.celery import app
from myapp.services.tasks import MyTask
from .models import Command
class MyController:
def my_method(self, json_string):
<non-async set up stuff here>
cmd_obj = Command.objects.create(<stuff>) # type: Command
task_exec = MyTask.delay(json_string, MyController._method_name, cmd_obj.id)
cmd_obj.task_id = task_exec
try:
return_dict = task_exec.get()
except Exception as e:
self._logger.error("ERROR: IP: {} and transaction: {}. Error Type: {}, "
"Celery Error: {}".format(ip_addr, transaction_id, type(e), e))
status_code, error = self._find_status_code(e)
return_dict = {"error": error, "status_code": status_code, "result": None}
return return_dict
**So here is my issue: **
When I run this Django controller by hitting the view with one request, one after the other, it works perfectly fine.
However, the external service I am hitting will throw an error for 2 concurrent requests (and that is expected - that is ok). Upon getting the error, I retry my task automatically.
Here is the weird part
Upon retry, the .get()
I have in my controller stops working for all concurrent requests. My controller just hangs there! And I know that celery is actually executing the task! Here is logs from the celery run:
[2018-09-25 19:10:24,932: INFO/MainProcess] Received task: myapp.tasks.MyTask[bafd62b6-7e29-4c39-86ff-fe903d864c4f]
[2018-09-25 19:10:25,710: INFO/MainProcess] Received task: myapp.tasks.MyTask[8d3b4279-0b7e-48cf-b45d-0f1f89e213d4] <-- THIS WILL FAIL BUT THAT IS OK
[2018-09-25 19:10:25,794: ERROR/ForkPoolWorker-1] Could not connect to device with IP <some ip> at all. Retry Later plase
[2018-09-25 19:10:25,798: WARNING/ForkPoolWorker-1] RETRYING NOW FOR DEVICE CONNECTION ERROR. TRANSACTION: b_txn || IP: <some ip>
[2018-09-25 19:10:25,821: INFO/MainProcess] Received task: myapp.tasks.MyTask[8d3b4279-0b7e-48cf-b45d-0f1f89e213d4] ETA:[2018-09-25 19:10:27.799473+00:00]
[2018-09-25 19:10:25,823: INFO/ForkPoolWorker-1] Task myapp.tasks.MyTask[8d3b4279-0b7e-48cf-b45d-0f1f89e213d4] retry: Retry in 2s: AnotherCustomError('Could not connect to IP <some ip> at all.',)
[2018-09-25 19:10:27,400: INFO/ForkPoolWorker-2] executed command some command at IP <some ip>
[2018-09-25 19:10:27,418: INFO/ForkPoolWorker-2] Task myapp.tasks.MyTask[bafd62b6-7e29-4c39-86ff-fe903d864c4f] succeeded in 2.4829552830196917s: {'error': None, 'status_code': 200, 'result': True}
<some command output here from a successful run> **<-- belongs to task bafd62b6-7e29-4c39-86ff-fe903d864c4f**
[2018-09-25 19:10:31,058: INFO/ForkPoolWorker-2] executed some command at IP <some ip>
[2018-09-25 19:10:31,059: INFO/ForkPoolWorker-2] Task command_runner.tasks.MyTask[8d3b4279-0b7e-48cf-b45d-0f1f89e213d4] succeeded in 2.404364461021032s: {'error': None, 'status_code': 200, 'result': True}
<some command output here from a successful run> **<-- belongs to task 8d3b4279-0b7e-48cf-b45d-0f1f89e213d4 which errored and retried itself**
So as you can see, the task does run on celery! It's just that the .get()
I have in my controller is unable to pick these results back up - regardless of successful tasks or the erroneous tasks.
Often times, the error I get when running concurrent requests Error: "Received 0x50 while expecting 0xce"
. What is that? what does that mean? Again, weirdly enough, all this works when doing one request after another without Django handling multiple incoming requests. Although, I haven't been able to retry for single requests.
Upvotes: 2
Views: 934
Reputation: 15946
The RPC backend (which is what get is waiting for) is designed to fail if it is used more than once or after a celery restart.
a result can only be retrieved once, and only by the client that initiated the task. Two different processes can’t wait for the same result.
The messages are transient (non-persistent) by default, so the results will disappear if the broker restarts. You can configure the result backend to send persistent messages using the result_persistent setting.
So what looks like it is happening is that the exception causes celery to stop and break its rpc connection with the calling controller. Given your use case, it may make more sense to use a permanent results backend like redis or a database.
Upvotes: 1