D_P
D_P

Reputation: 862

How to schedule my crawler function in django periodically using celery?

Here I have a view CrawlerHomeView which is used to create the task object from a form now I want to schedule this task periodically with celery.

I want to schedule this CrawlerHomeView process with the task object search_frequency and by checking some task object fields.

Task Model

class Task(models.Model):
    INITIAL = 0
    STARTED = 1
    COMPLETED = 2

    task_status = (
        (INITIAL, 'running'),
        (STARTED, 'running'),
        (COMPLETED, 'completed'),
        (ERROR, 'error')
    )

    FREQUENCY = (
        ('1', '1 hrs'),
        ('2', '2 hrs'),
        ('6', '6 hrs'),
        ('8', '8 hrs'),
        ('10', '10 hrs'),
    )

    name = models.CharField(max_length=255)
    scraping_end_date = models.DateField(null=True, blank=True)
    search_frequency = models.CharField(max_length=5, null=True, blank=True, choices=FREQUENCY)
    status = models.IntegerField(choices=task_status)

tasks.py

I want to run the view below posted periodically [period=(task's search_frequency time] if the task status is 0 or 1 and not crossed the task scraping end date. But I got stuck here. How can I do this?

@periodic_task(run_every=crontab(hour="task.search_frequency"))  # how to do  with task search_frequency value
def schedule_task(pk):
    task = Task.objects.get(pk=pk)
    if task.status == 0 or task.status == 1 and not datetime.date.today() > task.scraping_end_date:

        # perform the crawl function ---> def crawl() how ??

        if task.scraping_end_date == datetime.date.today():
            task.status = 2
            task.save()  # change the task status as complete.

views.py

I want to run this view periodically.How can I do it?

class CrawlerHomeView(LoginRequiredMixin, View):
    login_url = 'users:login'

    def get(self, request, *args, **kwargs):
        # all_task = Task.objects.all().order_by('-id')
        frequency = Task()
        categories = Category.objects.all()
        targets = TargetSite.objects.all()
        keywords = Keyword.objects.all()

        form = CreateTaskForm()
        context = {
            'targets': targets,
            'keywords': keywords,
            'frequency': frequency,
            'form':form,
            'categories': categories,
        }
        return render(request, 'index.html', context)

    def post(self, request, *args, **kwargs):

        form = CreateTaskForm(request.POST)
        if form.is_valid():

            # try:
            unique_id = str(uuid4()) # create a unique ID. 
            obj = form.save(commit=False)

            # obj.keywords = keywords
            obj.created_by = request.user
            obj.unique_id = unique_id
            obj.status = 0
            obj.save()
            form.save_m2m()

            keywords = ''
            # for keys in ast.literal_eval(obj.keywords.all()): #keywords change to csv
            for keys in obj.keywords.all():
                if keywords:
                    keywords += ', ' + keys.title
                else:
                    keywords += keys.title
            # tasks = request.POST.get('targets')
            # targets = ['thehimalayantimes', 'kathmandupost']
            # print('$$$$$$$$$$$$$$$ keywords', keywords)

            task_ids = [] #one Task/Project contains one or multiple scrapy task

            settings = {
                'spider_count' : len(obj.targets.all()),
                'keywords' : keywords,
                'unique_id': unique_id, # unique ID for each record for DB
                'USER_AGENT': 'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)'
            }

            # res = ast.literal_eval(ini_list) 

            for site_url in obj.targets.all():
                domain = urlparse(site_url.address).netloc # parse the url and extract the domain
                spider_name = domain.replace('.com', '')
                task = scrapyd.schedule('default', spider_name, settings=settings, url=site_url.address, domain=domain, keywords=keywords)

            # task = scrapyd.schedule('default', spider_name , settings=settings, url=obj.targets, domain=domain, keywords=obj.keywords)
            return redirect('crawler:task-list')
            # except:
            #     return render(request, 'index.html', {'form':form})
        return render(request, 'index.html', {'form':form, 'errors':form.errors})

Any Suggestions or answer is there for this problem ?

Upvotes: 2

Views: 1897

Answers (3)

hardik24
hardik24

Reputation: 1058

For the error,

Exception Type: EncodeError
Exception Value:    
Object of type timedelta is not JSON serializable 

Instead of defining following variable in django settings,

CELERY_BEAT_SCHEDULE = {

    'task-first': {
    'task': 'scheduler.tasks.create_task',
    'schedule': timedelta(minutes=1)
   },

can you try following in your celery file:

app.conf.beat_schedule = {
    'task-first': {
        'task': 'scheduler.tasks.create_task',
        'schedule': crontab(minute='*/1')
    }
}

this works for me given, celery server is up and running.

Apart from this why are you redirecting to 'list_tasks' after each task, what does it exactly do? Also, you have called the celery task from the view add_task_celery.delay(name,date,freq), is it just another way to add task apart from periodic task defined using celery-beat?

Edit 1:

My structure looks like as follow:

settings.py

CELERY_TIMEZONE = 'Asia/Kolkata'
CELERY_BROKER_URL = 'amqp://localhost'

celery.py

app.conf.beat_schedule = {
    'task1': {
        'task': '<app_name>.tasks.random_task',
        'schedule': crontab(minute=0, hour=0)
    },
}

Here you should note that I have a file named tasks in my app folder and there I have written a shared task as follow:

@shared_task
def random_task(total):
    ...

Also, apart from this you should start both celery beat as well as a celery worker process as follow:

celery -A <project_name>.celery worker -l error
celery -A <project_name>.celery beat -l error --scheduler django_celery_beat.schedulers:DatabaseScheduler

You can any scheduler you want, on production I use DatabaseScheduler. For testing you can try with following command:

celery -A <project_name> beat -l info -S django

You should run all these commands from the project folder of the Django project

Upvotes: 3

ACimander
ACimander

Reputation: 1979

After fighting Celery for 5 years in a 15k tasks/second setup I highly recommend you to switch to Dramatiq, which has a sane, reliable, performant code base that isn't split across multiple convoluted packages and works perfectly in two of my newer projects so far.

From the author's motivation

I’ve used Celery professionally for years and my growing frustration with it is one of the reasons why I developed dramatiq. Here are some of the main differences between Dramatiq, Celery and RQ:

There's also a a Django helper package: https://github.com/Bogdanp/django_dramatiq

Granted, you won't have a builtin celerybeat, but a cron calling python tasks is more robust anyway, we lost a good amount of data because celerybeat decided to stall regularly :)


There are two projects that aim to add periodic task creation: https://gitlab.com/bersace/periodiq and https://apscheduler.readthedocs.io/en/stable/

I haven't used those packages yet, what you could try with periodiq is selecting your database entries, loop through those and define a periodic-task for each (but this requires regular restarts of the periodiq worker to pick up changes):

# tasks.py
from dramatiq import get_broker
from periodiq import PeriodiqMiddleware, cron

broker = get_broker()
broker.add_middleware(PeriodiqMiddleware(skip_delay=30))


for obj in Task.objects.all():
   @dramatiq.actor(periodic=cron(obj.frequency))
   def hourly(obj=obj):
       # import logic based on obj.name
       # Do something each hour…

Upvotes: 9

Gaurav Jain
Gaurav Jain

Reputation: 1865

I believe the problem is with 2nd and 3rd parameter in the task definition, which is freq and date. Although from the error, you posted, Object of type timedelta is not JSON serializable, it looks like it's talking about freq field which is of type DurationField that returns timedelta object.

Ideally, both fields must be serialized before passing to the task. one simple way would be -

1) You can explicitly serialize these fields and pass to the task and in the task again convert it to datetime / timedelta object.

alternatively, you can dump whole data dict if there are too many items.

add_task_celery.delay(json.dumps(form.cleaned_data)),

and then in the task do -> json.loads(...)

2) Another thing you can try is to pass the serializer in the parameters explicitly.(using apply_async instead of delay)

add_task_celery.apply_async((name, date, freq), serializer='json')

3) You can also set value, if you haven't already, for setting CELERY_TASK_SERIALIZER = 'json' (default value is 'pickle').

Upvotes: 2

Related Questions