Reputation: 161
celery --version 5.1.2 (sun-harmonics)
django --version 3.2.8
I have a celery schedule that has four tasks that run in different timezones. I am using nowfun for setting the timezones and have set CELERY_ENABLE_UTC = False in settings.py. I followed the top response on this SO post: Celery beat - different time zone per task
Note that I made this change this morning - I was running a previous version of the code without these settings.
Currently, I am saving the celery results to CELERY_RESULT_BACKEND = 'django-db'.
Since implementing the change that allows for different tasks to be run according to different timezones I am getting an error when I run celery -A backend beat -l info.
It's super long though here is the head and tail: Head:
[2021-10-29 07:29:36,059: INFO/MainProcess] beat: Starting... [2021-10-29 07:29:36,067: ERROR/MainProcess] Cannot add entry 'celery.backend_cleanup' to database schedule: ValidationError(["Invalid timezone '<LocalTimezone: UTC+00>'"]). Contents: {'task': 'celery.backend_cleanup', 'schedule': <crontab: 0 4
- (m/h/d/dM/MY)>, 'options': {'expire_seconds': 43200}}
Tail:
django.core.exceptions.ValidationError: ["Invalid timezone '<LocalTimezone: UTC+00>'"]
Celery beat hangs on this last error message and I have to kill it with ctrl + c.
I went onto celery and read their instructions about manually resetting the database when timezone-related settings change - the website says:
$ python manage.py shell
from django_celery_beat.models import
PeriodicTask PeriodicTask.objects.update(last_run_at=None)
I then found some documentation that said:
Warning: If you change the Django TIME_ZONE setting your periodic task schedule will still be based on the old timezone. To fix that you would have to reset the “last run time” for each periodic task:
from django_celery_beat.models import PeriodicTask, PeriodicTasks
PeriodicTask.objects.all().update(last_run_at=None)
PeriodicTasks.changed()
Note that this will reset the state as if the periodic tasks have never run before.
So I think what's causing the problem is exactly what it says above - I changed timezones and the schedule is still running on the old UTC timezone so I need to update it, though my schedules have run before and so when I type:
>>> PeriodicTask.objects.all().update(last_run_at=None)
I get the response:
13
and then when I enter:
>>> PeriodicTasks.changed()
I get a type error:
TypeError: changed() missing 1 required positional argument: 'instance'
So my question is:
What do I do to update the PeriodTask and PeriodicTasks? What arguments should I pass to PeriodicTasks.changed() and is 13 the expected response for the first command?
Here is my celery.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
from celery.schedules import crontab
import pytz
from datetime import datetime
os.environ.setdefault(
'DJANGO_SETTINGS_MODULE',
'backend.settings'
)
app = Celery(
'backend'
)
app.config_from_object(
settings,
namespace='CELERY'
)
def uk_time():
return datetime.now(pytz.timezone('Europe/London'))
def us_time():
return datetime.now(pytz.timezone('EST'))
def jp_time():
return datetime.now(pytz.timezone('Japan'))
# Celery Beat Settings
app.conf.beat_schedule={
'generate_signals_london': {
'task': 'signals.tasks.generate_signals',
'schedule': crontab(
minute=0,
hour=8,
nowfun=uk_time,
day_of_week='1,2,3,4,5'
),
'args': ('UK',),
},
'generate_signals_ny': {
'task': 'signals.tasks.generate_signals',
'schedule': crontab(
minute=0,
hour=7,
nowfun=us_time,
day_of_week='1,2,3,4,5'
),
'args': ('NY',),
},
'generate_signals_nyse': {
'task': 'signals.tasks.generate_signals',
'schedule': crontab(
minute=0,
hour=9,
nowfun=us_time,
day_of_week='1,2,3,4,5'
),
'args': ('NYSE',),
},
'generate_signals_asia': {
'task': 'signals.tasks.generate_signals',
'schedule': crontab(
minute=0,
hour=8,
nowfun=jp_time,
day_of_week='1,2,3,4,5'
),
'args': ('JP',),
},
}
app.autodiscover_tasks()
Upvotes: 1
Views: 1765
Reputation: 161
When trying to create a schedule where tasks have different timezones and they depend on dst it is important to make this dynamic.
Create a task that updates the beat schedule database object
import os
from django import setup
from celery import Celery
from celery.schedules import crontab
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.settings')
setup()
app = Celery('api')
app.conf.timezone = 'UTC'
app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.broker_connection_retry_on_startup = True
# Register database scheduler for beat
app.conf.beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
# Register our `update_beat_schedule` task to run every Sunday at 20:00 UTC
app.conf.beat_schedule = {
'update_beat_schedule': {
'task': 'utility_app.tasks.update_beat_schedule',
'schedule': crontab(hour=20, minute=0, day_of_week='sun'),
'args': ()
},
}
app.autodiscover_tasks()
Then have the task create the schedule with everything it needs and update the PeriodicTask model. The reason I filtered it first is so that I can update instances that already exist - otherwise new instances are created instead
from django_celery_beat.models import PeriodicTask, CrontabSchedule
from celery import shared_task
import json
from pytz import timezone
from datetime import datetime
from utility_app.utils import first_business_days
class UtilsAppError(Exception):
def __init__(self, message):
self.message = message
super().__init__(f"{message}")
def get_mt4_timezone():
eastern = timezone('US/Eastern')
is_dst = bool(eastern.localize(datetime.now()).dst())
mt4_tz = 'Etc/GMT-3' if is_dst else 'Etc/GMT-2'
return mt4_tz
def get_year_month_day():
tz = timezone(get_mt4_timezone())
current_mt4_datetime = datetime.now(tz)
current_month = current_mt4_datetime.month
current_year = current_mt4_datetime.year
current_day = current_mt4_datetime.day
return current_year, current_month, current_day
def get_day_of_month_or_week(period='month'):
year, month, day = get_year_month_day()
first_business_day_next_month, first_business_day_following_week = first_business_days(year, month, day)
day_of_month = first_business_day_next_month.day
day_of_week = first_business_day_following_week.weekday()
return day_of_month if period == 'month' else day_of_week
@shared_task
def update_beat_schedule():
try:
mt4_timezone = get_mt4_timezone()
day_of_month = get_day_of_month_or_week('month')
day_of_week = get_day_of_month_or_week('week')
tasks_to_update = [
{
'name': 'monthly_analysis',
'task': 'signals_app.tasks.technical_analysis',
'hour': 0,
'timezone':mt4_timezone,
'day_of_month': day_of_month,
'args': (mt4_timezone,)
},
{
'name': 'weekly_analysis',
'task': 'signals_app.tasks.technical_analysis',
'hour': 0,
'timezone':mt4_timezone,
'day_of_week': day_of_week,
'args': (mt4_timezone,)
},
{
'name': 'tokyo_bias',
'task': 'signals_app.tasks.process_signal_tasks',
'hour': 0,
'timezone':mt4_timezone,
'args': ('Tokyo', 'market_open_bias', mt4_timezone)
},
{
'name': 'london_bias',
'task': 'signals_app.tasks.process_signal_tasks',
'hour': 8,
'timezone':mt4_timezone,
'args': ('London', 'market_open_bias', mt4_timezone)
},
{
'name': 'ny_bias',
'task': 'signals_app.tasks.process_signal_tasks',
'hour': 12,
'timezone':mt4_timezone,
'args': ('NewYork', 'market_open_bias', mt4_timezone)
},
{
'name': 'nyse_bias',
'task': 'signals_app.tasks.process_signal_tasks',
'hour': 16,
'timezone':mt4_timezone,
'args': ('NYSE', 'market_open_bias', mt4_timezone)
},
{
'name': 'tokyo_market_open',
'task': 'signals_app.tasks.process_signal_tasks',
'hour': 9,
'timezone':'Asia/Tokyo',
'args': ('Tokyo', 'market_open', mt4_timezone)
},
{
'name': 'london_market_open',
'task': 'signals_app.tasks.process_signal_tasks',
'hour': 8,
'timezone':'Europe/London',
'args': ('London', 'market_open', mt4_timezone)
},
{
'name': 'ny_market_open',
'task': 'signals_app.tasks.process_signal_tasks',
'hour': 8,
'timezone':'US/Eastern',
'args': ('NewYork', 'market_open', mt4_timezone)
},
{
'name': 'nyse_market_open',
'task': 'signals_app.tasks.process_signal_tasks',
'hour': 10,
'timezone':'US/Eastern',
'args': ('NYSE', 'market_open', mt4_timezone)
}
]
for task in tasks_to_update:
# First, try to find the PeriodicTask by name.
periodic_task = PeriodicTask.objects.filter(name=task['name']).first()
if periodic_task:
# If it exists, update its CrontabSchedule
crontab = periodic_task.crontab
crontab.hour = task['hour']
crontab.minute = 0
crontab.day_of_month = task.get('day_of_month', '*')
crontab.day_of_week = task.get('day_of_week', '*')
crontab.timezone = task['timezone']
crontab.save()
else:
# If it doesn't exist, create a new CrontabSchedule and PeriodicTask
crontab, _ = CrontabSchedule.objects.get_or_create(
hour=task['hour'],
minute=0,
day_of_month=task.get('day_of_month', '*'),
day_of_week=task.get('day_of_week', '*'),
timezone=task['timezone']
)
PeriodicTask.objects.create(
name=task['name'],
crontab=crontab,
args=json.dumps(task.get('args', []))
)
except Exception as e:
raise UtilsAppError(f"Error updating beat schedule for task {task['name']}: {e}")
Upvotes: -1