ford prefect
ford prefect

Reputation: 7388

Collecting futures from APScheduler

I have the following code:

import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler 


class FutureScheduler(object):

    def __init__():
        self.futures = []
        self.scheduler = BlockingScheduler()
        self.pool = ThreadPoolExecutor(5)
        self.full_frame = pd.DataFrame()

    def start(self):
        job = self.scheduler.add_job(self.add_future, 'cron', day_of_week='mon-fri', hour='8-15', minute='*')
        self.scheduler.start()
        self.flush_csvs()

    def add_future(self):
        self.futures.append(self.pool.submit(self.long_running_task))

    def flush_csvs(self):
        for future in as_completed(self.futures):
            results = future.result()
            self.full_frame = pd.concat((self.full_frame, results))
            self.full_frame.to_csv('results.csv')
            print "flushed... Queue size: %s" % len(self.futures)

    def long_running_task(self):
        #takes a while may or may not return before the next one is kicked off

So the problem I have is that the code inside of the flush_csvs loop is never running. Do I have to add all futures to the list before as_completed is called? Is there a way to have the BlockingScheduler return a future? I see it returns a Job but in this case I want it to be more future-like.

Upvotes: 1

Views: 403

Answers (1)

Cody A. Ray
Cody A. Ray

Reputation: 6017

That doesn't work because the scheduler blocks the main thread from continuing. This prevents the flush_csvs from ever getting executed.

self.scheduler.start()
self.flush_csvs()

However, this is probably not what you want anyway. APScheduler uses a threadpool internally so the callback (self.long_running_task) is already executed in a separate thread.

You can change the configuration of this thread pool through APScheduler depending on the number of workers you need, if you need multiple cores (use the ProcessPoolExecutor instead of ThreadPoolExecutor), etc. You might also be able to configure each job to do what you want. For example, configure the policy for jobs that run once a minute to coalesce (only run one) instead of running multiple times back-to-back in case of delays.

http://apscheduler.readthedocs.io/en/latest/userguide.html#choosing-the-right-scheduler-job-store-s-executor-s-and-trigger-s

Upvotes: 1

Related Questions