Reputation: 322
I have a celery job to run MySQL databases, however, it always got Lock Wait Timeout
. After digging into the databases queries, I realized that celery triggered another job after 1800 sec, and got my databases issue. I don't know why – my job did not fail yet!
@celery.task(bind=True, acks_late=True)
def etl_pipeline(dev=dev, test=test):
I can tell that MySQL got the same query again, it could be that Celery triggers the same job. Why here I got retry, and the default retry is 180 sec (3 min).
Here is official doc:
default_retry_delay = 180
Default time in seconds before a retry of the task should be executed. 3 minutes by default.
But my case is 1800 sec.
Also, my broker got some other warning, I'm not sure if this is related:
The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend.
Config RabbitMq
RABBITMQ_SERVER = 'amqp://{}:{}@{}'.format(
os.getenv('RABBITMQ_USER'),
os.getenv('RABBITMQ_PASS'),
os.getenv('RABBITMQ_HOST')
)
broker_url = '{}/{}'.format(
RABBITMQ_SERVER,
os.getenv('RABBITMQ_VHOST'),
)
backend = 'amqp'
How can I solve this? Thank you!
Celery: 4.2.0
I am using job = chain(single_job), but i only have one single_job job() starting the job.
mysql> show processlist;
+-------+------+---------------+------------------+---------+------+-----------+
| Id | User | Host | db | Command | Time | State |
+-------+------+---------------+------------------+---------+------+-----------+
| 97189 | clp | 172.11.17.202 | bain_ai_database | Query | 0 | init |
| 97488 | clp | 172.11.11.252 | bain_ai_database | Query | 1505 | executing |
| 97489 | clp | 172.11.11.252 | bain_ai_database | Sleep | 1851 | |
| 97543 | clp | 172.21.6.242 | bain_ai_database | Query | 51 | updating |
| 97544 | clp | 172.21.6.242 | bain_ai_database | Sleep | 51 | |
+-------+------+---------------+------------------+---------+------+-----------+
Upvotes: 0
Views: 273
Reputation: 15986
Depending on how you execute the sql query, here is what I would try. (1) since you have bind=True
, the task should be the first parameter to your function. The convention in celery is to call that first parameter self
. (2) You want to try and catch the database level exception that is occurring and ignore it.
from celery.utils.log import get_task_logger
log = get_task_logger(__name__)
@celery.task(bind=True, acks_late=True)
def etl_pipeline(self, dev=dev, test=test):
try:
# try querying the database here using sqlalchemy or mysqlconnect??
except Exception as ex:
# for now, log the exception and type so that you can drill down into what is happening
log.info('[etl_pipeline] exception of type %s.%s: %s', ex.__class__.__module__, ex.__class__.__name__, ex)
raise
The debugging that you will get from the logging should help you determine which error you are getting on the client side.
Upvotes: 1