Reputation: 503
For my flask app I have a folder called 'flaskblog'. In the same directory I have run.py:
from flaskblog import app, db
db.init_app(app)
if __name__ == '__main__':
app.run(debug=True, port=5000)
Inside flaskblog folder init.py:
#...some more code
from flask import Flask
app = Flask(__name__)
app.config.from_object(Config)
#.. and so on
def register_app(app):
"""Create an application."""
main_blueprint = Blueprint('main', __name__)
app.register_blueprint(main_blueprint)
return app
register_app(app)
# import BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
# define the job
from flaskblog.models import Chatmessage
from datetime import datetime, timedelta
def hello_job():
today = datetime.today()
start_date = today - timedelta(days=1)
end_date = today + timedelta(days=1)
messages = Chatmessage.query.filter(Chatmessage.date_sent <= end_date).\
filter(Chatmessage.date_sent >= start_date).all()
print(today, start_date, end_date)
print('Hello Job! The time is: ')
# init BackgroundScheduler job
scheduler = BackgroundScheduler()
# in your case you could change seconds to hours
scheduler.add_job(hello_job, trigger='interval', seconds=10)
scheduler.start()
#find correct place to put job starts
I am trying to schedule a function to run every now and then like explained in this question: How to schedule a function to run every hour on Flask?
The problem is that I want to use models, Flask-SQLAlchemy.. like Model.query... but it cannot be used before applications is registered "register_app(app)" because of circular imports. So I put schedule after comment "#import BackgroundScheduler" in my code.
Is it a correct way to do it? I get the doubt from reloading visual code debugger and seeing that function hello_job() from last reload is still running, or if app fails to load function hello_job() starts to run anyways. So am afraid it could have issues on production server.
Upvotes: 0
Views: 931
Reputation: 63
You can do something like this
in your init.py
...
scheduler = BackgroundScheduler()
scheduler.start()
atexit.register(lambda: scheduler.shutdown())
...
#at the end
import job
in a job.py
from init import scheduler
cron_time = '0 * * * *'
@scheduler.scheduled_job(CronTrigger.from_crontab(cron_time), id='hello_job')
def hello_job():
...
Upvotes: 1