Usman Shabbir
Usman Shabbir

Reputation: 126

Can't get kwargs data in function in Django Celery Beat

I'm implementing a reminder module in the application using Django celery-beat, I'm creating cron tab in periodic tasks and passing dictionary in kwargs parameter. It's successfully saved in Django periodic task table but when the scheduled task runs on time and calls the mentioned function, it's not getting kwargs data and through the exception.

settings.py

INSTALLED_APPS = [    
    'django_celery_beat',
]

# CELERY STUFF
CELERY_BROKER_URL = "redis://localhost:6379"
CELERY_RESULT_BACKEND = "redis://localhost:6379"

CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'

init.py

from .celery import app as celery_app

all = ("celery_app",)

celery.py

import os

from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings_local")

app = Celery("baseapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

Here is the task scheduler function which is creating periodic tasks:

def task_scheduler(raw_data):
try:
    if raw_data["start"] and raw_data["reminder"]:
        reminder_time = datetime.datetime.strptime(raw_data["start"], '%Y-%m-%dT%H:%M:%S.%fZ') - datetime.timedelta(minutes=raw_data["reminder"])
        reminder, _ = CrontabSchedule.objects.get_or_create(
            minute=reminder_time.minute,
            hour=reminder_time.hour,
            day_of_week="*",
            day_of_month=reminder_time.day,
            month_of_year=reminder_time.month,
        )
        PeriodicTask.objects.update_or_create(
            name=raw_data["summary"],
            task="reminder_notification",
            crontab=reminder,
            expire_seconds=7200,             
            kwargs=json.dumps({'test132':'123'}),                

        )
except Exception as error:
    print(error)

Here is a reminder notification function which is called when the task runs on time:

@shared_task(name="reminder_notification")
def reminder_notification(*args, **kwargs):
    print("hello task")

Here are tasks in the database: enter image description here

Here is the error:

[2021-01-07 06:13:00,021: ERROR/Beat] Message Error: Couldn't apply scheduled task celery15: reminder_notification() got an unexpected keyword argument 'test132'
['  File "/home/usman/Documents/venv/bin/celery", line 8, in <module>\n    sys.exit(main())\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/__main__.py", line 15, in main\n    sys.exit(_main())\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/bin/celery.py", line 213, in main\n    return celery(auto_envvar_prefix="CELERY")\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/click/core.py", line 829, in __call__\n    return self.main(*args, **kwargs)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/click/core.py", line 782, in main\n    rv = self.invoke(ctx)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/click/core.py", line 1259, in invoke\n    return _process_result(sub_ctx.command.invoke(sub_ctx))\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/click/core.py", line 1066, in invoke\n    return ctx.invoke(self.callback, **ctx.params)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/click/core.py", line 610, in invoke\n    return callback(*args, **kwargs)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/click/decorators.py", line 21, in new_func\n    return f(get_current_context(), *args, **kwargs)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/bin/base.py", line 132, in caller\n    return f(ctx, *args, **kwargs)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/bin/worker.py", line 327, in worker\n    worker.start()\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/worker/worker.py", line 203, in start\n    self.blueprint.start(self)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start\n    step.start(parent)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start\n    return self.obj.start()\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/billiard/process.py", line 124, in start\n    self._popen = self._Popen(self)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/billiard/context.py", line 276, in _Popen\n    return _default_context.get_context().Process._Popen(process_obj)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/billiard/context.py", line 333, in _Popen\n    return Popen(process_obj)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/billiard/popen_fork.py", line 24, in __init__\n    self._launch(process_obj)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/billiard/popen_fork.py", line 79, in _launch\n    code = process_obj._bootstrap()\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/billiard/process.py", line 327, in _bootstrap\n    self.run()\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 703, in run\n    self.service.start(embedded_process=True)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 627, in start\n    interval = self.scheduler.tick()\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 339, in tick\n    self.apply_entry(entry, producer=self.producer)\n', '  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 272, in apply_entry\n    exc, traceback.format_stack(), exc_info=True)\n']
Traceback (most recent call last):
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 386, in apply_async
    return task.apply_async(entry_args, entry_kwargs,
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/app/task.py", line 526, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: reminder_notification() got an unexpected keyword argument 'test132'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 269, in apply_entry
    result = self.apply_async(entry, producer=producer, advance=False)
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 394, in apply_async
    reraise(SchedulingError, SchedulingError(
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/exceptions.py", line 104, in reraise
    raise value.with_traceback(tb)
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/beat.py", line 386, in apply_async
    return task.apply_async(entry_args, entry_kwargs,
  File "/home/usman/Documents/venv/lib/python3.8/site-packages/celery/app/task.py", line 526, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
celery.beat.SchedulingError: Couldn't apply scheduled task celery15: reminder_notification() got an unexpected keyword argument 'test132'

What am I doing wrong?

Upvotes: 0

Views: 2700

Answers (1)

Usman Shabbir
Usman Shabbir

Reputation: 126

I'm getting kwargs data successfully.

Solution:

The current running worker does not get the updated changes from the function, So we need to stop the worker and run again.

celery -A baseapp worker --beat --scheduler django --loglevel=info

Upvotes: 2

Related Questions