Reputation: 7388
I have the following code:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
class FutureScheduler(object):
def __init__():
self.futures = []
self.scheduler = BlockingScheduler()
self.pool = ThreadPoolExecutor(5)
self.full_frame = pd.DataFrame()
def start(self):
job = self.scheduler.add_job(self.add_future, 'cron', day_of_week='mon-fri', hour='8-15', minute='*')
self.scheduler.start()
self.flush_csvs()
def add_future(self):
self.futures.append(self.pool.submit(self.long_running_task))
def flush_csvs(self):
for future in as_completed(self.futures):
results = future.result()
self.full_frame = pd.concat((self.full_frame, results))
self.full_frame.to_csv('results.csv')
print "flushed... Queue size: %s" % len(self.futures)
def long_running_task(self):
#takes a while may or may not return before the next one is kicked off
So the problem I have is that the code inside of the flush_csvs
loop is never running. Do I have to add all futures to the list before as_completed
is called? Is there a way to have the BlockingScheduler
return a future? I see it returns a Job
but in this case I want it to be more future-like.
Upvotes: 1
Views: 403
Reputation: 6017
That doesn't work because the scheduler blocks the main thread from continuing. This prevents the flush_csvs from ever getting executed.
self.scheduler.start()
self.flush_csvs()
However, this is probably not what you want anyway. APScheduler uses a threadpool internally so the callback (self.long_running_task) is already executed in a separate thread.
You can change the configuration of this thread pool through APScheduler depending on the number of workers you need, if you need multiple cores (use the ProcessPoolExecutor instead of ThreadPoolExecutor), etc. You might also be able to configure each job to do what you want. For example, configure the policy for jobs that run once a minute to coalesce (only run one) instead of running multiple times back-to-back in case of delays.
Upvotes: 1