Reputation: 1269
In my current project, what I need to do is to get data from 700+ endpoints and then send that data to another 700+ endpoints. My approach is to use Django Celery Redis, Put 70+ endpoints on each worker so that there are approx 10 workers which will retrieve the data and then post the data.
For this, I am using Chord to do the parallel task and then calculate the time it takes.
The problem is that Celery is running the same task multiple times. task_get_data
is the main method that first gets the list of websites then splits it into groups of 70 each and then calls task_post_data
using Chord.
In the output below you can see website_A
, website_B
etc. multiple times, I have manually checked my data and everything and there is no repetition of websites but when the celery task is submitted, multiple entries are created.
Also, Is there any way to monitor the number of workers and what are they processing?
Below is the code
os.environ.setdefault('DJANGO_SETTINGS_MODULE','django_backend.settings')
app = Celery('django_backend', backend='redis://localhost:6379', broker='redis://localhost:6379')
app.config_from_object('django.conf:settings', namespace='CELERY')
# app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
def post_data(json_obj, website):
for items in json_obj:
md = md + items['data']
n = 50
list_of_objects = [md[i:i+n] for i in range(0, len(md), n)]
print("Starting to post data using post__data")
for items in list_of_objects:
data_details = PostDataDetails()
data_details.data = items
post_data_response = ""
try:
post_data_response = client.post_data(post_data_details = data_details)
print("Successfully posted data with status code " + str(post_data_response.status) + " for " + website)
except Exception as e:
return post_data_response.status
def read_from_file():
with open('./django_backend/data.json') as json_file:
data = json.load(json_file)
# print(data)
return data
def split_list(l,n):
website_list = [l[i:i+n] for i in range(0, len(l), n)]
return website_list
#-----------this is the main method---------
@app.task(bind=True, name="task_get_data")
def task_get_data(self):
start_time = datetime.datetime.now()
try:
website = task_helper.get_data()
task_helper.write_logs("info", "Successfully read from cache")
except Exception as e:
task_helper.write_logs("error", "Error in reading from cache. Error" + str(e))
website_list_chunks = split_list(list(website.keys()),70)
callback = get_time_difference.s(start_time)
try:
task_helper.write_logs("info", "Starting the task_post_poller_data task to post data")
header = [task_post_data.s(website_list) for website_list in website_list_chunks]
result = chord(header)(callback)
print(result)
task_helper.write_logs("info", "Successfully completed the task to post data")
except Exception as e:
task_helper.write_logs("error", "Error in creating task_post_data task. Error" + str(e))
@app.task(bind=True, name="task_post_data")
def task_post_data(self,website_list=None) -> dict:
json_object_response = True
post_data_response = None
for website in website_list:
if json_object_response:
file_data = read_from_file()
try:
post_data_response = post_data(file_data, website)
# pass
except Exception as e:
print("error", "Error in creating task_post_poller_data task. Error" + str(e))
return post_data_response
I am running the code using
celery -A django_backend worker -l debug --purge
command and I am submitting the task using this command
python manage.py shell
>>>from django_backend.tasks import task_get_data
>>>task_get_data.delay()
Below is the output from console
[2021-07-20 19:54:54,789: INFO/ForkPoolWorker-3] Successfully posted data with status code 200 for website_D
[2021-07-20 19:54:54,835: INFO/ForkPoolWorker-5] Successfully posted data with status code 200 for website_E
[2021-07-20 19:54:54,840: INFO/ForkPoolWorker-2] Successfully posted data with status code 200 for website_B
[2021-07-20 19:54:54,843: INFO/ForkPoolWorker-1] Successfully posted data with status code 200 for website_A
[2021-07-20 19:54:54,882: INFO/ForkPoolWorker-6] Successfully posted data with status code 200 for website_P
[2021-07-20 19:54:54,891: INFO/ForkPoolWorker-8] Successfully posted data with status code 200 for website_I
[2021-07-20 19:54:54,895: INFO/ForkPoolWorker-4] Successfully posted data with status code 200 for website_R
[2021-07-20 19:54:55,021: INFO/ForkPoolWorker-3] Successfully posted data with status code 200 for website_D
[2021-07-20 19:54:55,025: INFO/ForkPoolWorker-7] Successfully posted data with status code 200 for website_C
[2021-07-20 19:54:55,073: INFO/ForkPoolWorker-2] Successfully posted data with status code 200 for website_B
[2021-07-20 19:54:55,086: INFO/ForkPoolWorker-1] Successfully posted data with status code 200 for website_A
Upvotes: 1
Views: 2636
Reputation: 1107
It is one of the known issue with Celery and Redis. In one of my project I used to assign a unique identifier in cache for each task and then in the beginning of the task just check if the key already exist or not. You can write a context manager for such thing. Something like this
@contextmanager
def task_lock(lock_id, oid, lock_expire_seconds=600, unlock_after_finish=False):
"""
Be sure that task runs only once
:param lock_id: unique id of task
:param oid: unique id of current job (needs for debug only)
:param lock_expire_seconds: task will be unlocked in x seconds
:param unlock_after_finish: bool, allow run next task after finish of current one
"""
timeout_at = datetime.utcnow() + timedelta(seconds=lock_expire_seconds)
oid = "{}-{}".format(os.environ.get("HOSTNAME", ""), oid)
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, lock_expire_seconds)
try:
yield status
finally:
# cache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if unlock_after_finish and datetime.utcnow() < timeout_at:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else.
cache.delete(lock_id)
And then in your task code you can do
def some_task():
with task_lock("task-lcok", current_time, lock_expire_seconds=10) as acquired:
if acquired:
# do something
Otherwise, there are some otherthings in the configuration that you can play with. For example check these answers.
Upvotes: 5