Reputation: 23
When I try to add a task to rabbitmq with async_result sometimes I get the following error, if run the function again then it doesn't show any error
ERROR:root:maximum recursion depth exceeded while calling a Python object
web | Traceback (most recent call last):
web | File "/home/app/webservice.py", line 75, in execute_task
web | async_result = available_tasks[task_name].apply_async(args=[data], kwargs=kwargs)
web | File "/usr/local/lib/python3.10/dist-packages/celery/app/task.py", line 572, in apply_async
web | return app.send_task(
web | File "/usr/local/lib/python3.10/dist-packages/celery/app/base.py", line 766, in send_task
web | self.backend.on_task_call(P, task_id)
web | File "/usr/local/lib/python3.10/dist-packages/celery/backends/rpc.py", line 164, in on_task_call
web | maybe_declare(self.binding(producer.channel), retry=True)
web | File "/usr/local/lib/python3.10/dist-packages/kombu/common.py", line 109, in maybe_declare
web | return _imaybe_declare(entity, channel, **retry_policy)
web | File "/usr/local/lib/python3.10/dist-packages/kombu/common.py", line 164, in _imaybe_declare
web | return entity.channel.connection.client.ensure(
web | File "/usr/local/lib/python3.10/dist-packages/kombu/connection.py", line 524, in _ensured
web | return fun(*args, **kwargs)
web | File "/usr/local/lib/python3.10/dist-packages/kombu/common.py", line 150, in _maybe_declare
web | entity.declare(channel=channel)
web | File "/usr/local/lib/python3.10/dist-packages/kombu/entity.py", line 606, in declare
web | self._create_queue(nowait=nowait, channel=channel)
web | File "/usr/local/lib/python3.10/dist-packages/kombu/entity.py", line 615, in _create_queue
web | self.queue_declare(nowait=nowait, passive=False, channel=channel)
web | File "/usr/local/lib/python3.10/dist-packages/kombu/entity.py", line 643, in queue_declare
web | ret = channel.queue_declare(
web | File "/usr/local/lib/python3.10/dist-packages/amqp/channel.py", line 1146, in queue_declare
web | return queue_declare_ok_t(*self.wait(
web | File "/usr/local/lib/python3.10/dist-packages/amqp/abstract_channel.py", line 86, in wait
web | self.connection.drain_events(timeout=timeout)
web | File "/usr/local/lib/python3.10/dist-packages/amqp/connection.py", line 523, in drain_events
web | while not self.blocking_read(timeout):
web | File "/usr/local/lib/python3.10/dist-packages/amqp/connection.py", line 529, in blocking_read
web | return self.on_inbound_frame(frame)
web | File "/usr/local/lib/python3.10/dist-packages/amqp/method_framing.py", line 53, in on_frame
web | callback(channel, method_sig, buf, None)
web | File "/usr/local/lib/python3.10/dist-packages/amqp/connection.py", line 535, in on_inbound_method
web | return self.channels[channel_id].dispatch_method(
web | File "/usr/local/lib/python3.10/dist-packages/amqp/abstract_channel.py", line 143, in dispatch_method
web | listener(*args)
web | File "/usr/local/lib/python3.10/dist-packages/amqp/channel.py", line 276, in _on_close
web | self._do_revive()
web | File "/usr/local/lib/python3.10/dist-packages/amqp/channel.py", line 161, in _do_revive
web | self.open()
web | File "/usr/local/lib/python3.10/dist-packages/amqp/channel.py", line 432, in open
web | return self.send_method(
web | File "/usr/local/lib/python3.10/dist-packages/amqp/abstract_channel.py", line 66, in send_method
web | return self.wait(wait, returns_tuple=returns_tuple)
web | File "/usr/local/lib/python3.10/dist-packages/amqp/abstract_channel.py", line 86, in wait
web | self.connection.drain_events(timeout=timeout)
web | File "/usr/local/lib/python3.10/dist-packages/amqp/connection.py", line 523, in drain_events
web | while not self.blocking_read(timeout):
web | File "/usr/local/lib/python3.10/dist-packages/amqp/connection.py", line 529, in blocking_read
web | return self.on_inbound_frame(frame)
web | File "/usr/local/lib/python3.10/dist-packages/amqp/method_framing.py", line 53, in on_frame
web | callback(channel, method_sig, buf, None)
web | File "/usr/local/lib/python3.10/dist-packages/amqp/connection.py", line 535, in on_inbound_method
web | return self.channels[channel_id].dispatch_method(
web | File "/usr/local/lib/python3.10/dist-packages/amqp/abstract_channel.py", line 143, in dispatch_method
web | listener(*args)
web | File "/usr/local/lib/python3.10/dist-packages/amqp/channel.py", line 276, in _on_close
web | self._do_revive()
web | File "/usr/local/lib/python3.10/dist-packages/amqp/channel.py", line 161, in _do_revive
web | self.open()
My configurations to rabbitmq is this
config = {
"broker": "amqp://rabbitmq:5672//",
"backend": "rpc://rabbitmq:5672//"
}
capp = Celery(__name__, broker=config['broker'], backend=config['backend'])
capp.conf.task_default_queue = 'default'
My rabbitmq.conf only has the next line uncommented consumer_timeout = 36000000
The python app and rabbitmq are on docker containers on the same network.
My python version is 3.10, celery is 5.1.2 and rabbitmq is 3.11.7
EDIT
Rabbitmq also sends the following message
2025-02-11 21:16:30.711331+00:00 [error] <0.3095.0> Channel error on connection <0.3088.0> (172.19.0.7:43072 -> 172.19.0.17:5672, vhost: '/', user: 'guest'), channel 1:
2025-02-11 21:16:30.711331+00:00 [error] <0.3095.0> operation none caused a channel exception precondition_failed: delivery acknowledgement on channel 1 timed out. Timeout value used: 36000000 ms. This timeout value can be configured, see consumers doc guide to learn more
2025-02-11T21:17:30.711612+00:00 warning: FORMATTER CRASH: {"Consumer '~ts' on channel ~w and ~ts has timed out waiting for a consumer acknowledgement of a delivery with delivery tag = ~b. Timeout used: ~tp ms. This timeout value can be configured, see consumers doc guide to learn more",[1,1,"queue '9b2910a8-5437-364c-b40e-d8c2e61516ec' in vhost '/'",1,36000000]}
Upvotes: 0
Views: 34
Reputation: 23
I found the problem!
I wasn't acking the task when it finished
It wasn't something that gave an answer, so I did't get the result. I only checked that it was on rabbitmq like this:
async_result = available_tasks[task_name].apply_async(args=[data], kwargs=kwargs)
data = {'job_id': async_result.id, 'state': async_result.state}
code = 200
status = True
message = ''
return response(status, code, data, message)
But I shoulded use something like this:
async_result = available_tasks[task_name].apply_async(args=[data], kwargs=kwargs)
data = {'job_id': async_result.id, 'state': async_result.state, 'job_result': async_result.get()}
code = 200
status = True
message = ''
return response(status, code, data, message)
If I wouldn't use the get function rabbitmq would wait for the acknowledgement and disconect. That was the origin of the problem
Upvotes: 1