Reputation: 3682
I'm still new to tornado's async programming. So I'm not 100% certain I did this the right way.
Here is the script i'm trying to run async
import locale
import logging
from tornado.gen import Task, Return, coroutine
from datetime import datetime, timedelta
from dateutil import tz
from mongo import db
@coroutine
def table():
response = yield Task(driver)
raise Return(response)
@coroutine
def driver():
try:
locale.setlocale(locale.LC_ALL, 'en_US')
except:
locale.setlocale(locale.LC_ALL, 'en_US.utf8')
from_zone = tz.gettz('UTC')
to_zone = tz.gettz('America/New_York')
utc = datetime.utcnow()
utc = utc.replace(tzinfo=from_zone)
current_date = utc.astimezone(to_zone)
start_at = current_date + timedelta(days=-2)
end_at = current_date + timedelta(days=-1)
start_date = start_at.strftime("%Y-%m-%d")
start_day = start_at.strftime("%a")
end_date = end_at.strftime("%Y-%m-%d")
end_day = end_at.strftime("%a")
year = end_at.strftime("%Y")
prev_year = year
month = end_at.strftime("%m")
prev_month = int(month) - 1
if prev_month < 1:
prev_month = "12"
prev_year = str(int(prev_year) - 1)
else:
if prev_month < 10:
prev_month = "0%s" % prev_month
prev_month = str(prev_month)
dates = {start_date:{}}
while start_at < end_at:
end_slot = start_at + timedelta(days=1)
start_at += timedelta(days=1)
dates[end_slot.strftime("%Y-%m-%d")] = {}
logging.info('getting sites from db')
sites = list(db.sites.find())
revenue_totals = {}
provider_totals = {}
rev_providers = {}
# slugs, tags, name, PLACEMENT, TOTAL_IMPRESSIONS, FILLED_IMPRESSIONS, DEFAULTED_IMPRESSIONS, REVENUE, DATE, NETWORK, CTR, AD_SIZE, LOCATION, CPM
for date in dates.keys():
logging.info('retrieving data for {0}'.format(date))
records = list(db.revenue_data.find({'date': date}))
logging.info('data recieved, parsing...')
for record in records:
for rec in record['records']:
rec['slugs'] = []
try:
rec['REVENUE'] = float(rec['REVENUE'].replace('$','').replace(',',''))
except:
rec['REVENUE'] = rec['REVENUE']
# add to our revenue provider tallies
if rec['PLACEMENT'].find('l2') > -1 or rec['PLACEMENT'].find('2nd') > -1 or rec['PLACEMENT'].find('second') > -1:
rec['NETWORK'] = '%s_2nd' % rec['NETWORK']
if rec['NETWORK'].lower() in rev_providers.keys():
if date in rev_providers[rec['NETWORK'].lower()].keys():
if float(rec['REVENUE']) > 0.0:
rev_providers[rec['NETWORK'].lower()][date]['revenue'] += rec['REVENUE']
rev_providers[rec['NETWORK'].lower()][date]['count'] += 1
else:
if float(rec['REVENUE']) > 0.0:
rev_providers[rec['NETWORK'].lower()][date] = {'revenue':rec['REVENUE'],'count':1}
else:
if float(rec['REVENUE']) > 0.0:
rev_providers[rec['NETWORK'].lower()] = {date:{'revenue':rec['REVENUE'],'count':1}}
if date in provider_totals:
if float(rec['REVENUE']) > 0.0:
provider_totals[date]['revenue'] += rec['REVENUE']
provider_totals[date]['count'] += 1
else:
if float(rec['REVENUE']) > 0.0:
provider_totals[date] = {'revenue':rec['REVENUE'],'count':1}
# determine what slugs match this line item
for site in sites:
append_slug = False
if 'reporting' in site.keys():
for r in site['reporting']:
if r.strip() != '' and rec['PLACEMENT'].replace(' ','').lower().find(r.strip().lower()) > -1:
append_slug = True
if 'prevent_reporting' in site.keys():
for r in site['prevent_reporting']:
if r.strip() != '' and rec['PLACEMENT'].replace(' ','').lower().find(r.strip().lower()) > -1:
append_slug = False
if append_slug:
rec['slugs'].append(site['slug'])
try:
rec['tags'] = site['tags']
except:
rec['tags'] = []
rec['name'] = site['name']
if 'name' in rec.keys() and float(rec['REVENUE']) > 0.0:
# this is a line item we want to keep
# determine if we have this site for this date yet
if rec['name'] in dates[date].keys():
# we have this site for this date; so let's just add this line item to it
dates[date][rec['name']].append(rec)
else:
# we need to add this site for this date
dates[date][rec['name']] = [rec]
if date in revenue_totals.keys():
revenue_totals[date] += float(rec['REVENUE'])
else:
revenue_totals[date] = float(rec['REVENUE'])
mtd_sum = 0.0
pmtd_sum = 0.0
rp_html = ''
rp_html += '<p>%s generated <strong>%s</strong> in revenue.</p>' % (end_date, locale.currency(revenue_totals[end_date], grouping=True))
rp_html += '<p>%s generated <strong>%s</strong> in revenue.</p>' % (start_date, locale.currency(revenue_totals[start_date], grouping=True))
rp_html += '<p>The following table shows the breakdown over these two days.</p>'
rp_html += '<p></p>'
rp_html += '<table border="1">'
rp_html += '<tr>'
rp_html += '<th>NETWORK</th>'
rp_html += '<th>%s</th>' % start_date
rp_html += '<th>TAGS</th>'
rp_html += '<th>%s</th>' % end_date
rp_html += '<th>TAGS</th>'
rp_html += '<th>MTD</th>'
rp_html += '<th>PM</th>'
rp_html += '</tr>'
keys = sorted(rev_providers.keys())
logging.info('creating table')
for network in keys:
rp_html += '<tr>'
rp_html += '<td>%s</td>' % network
# show the start date details
try:
rp_html += '<td align="right">%s</td>' % locale.currency(rev_providers[network][start_date]['revenue'], grouping=True)
# log in the revenue provider history
date_bits = start_date.split('-')
db.revenue_provider_history.update({'network':network, 'date':start_date}, {'network':network,'date':start_date,'revenue':rev_providers[network][start_date]['revenue'],'count':rev_providers[network][start_date]['count'],'day':date_bits[2],'month':date_bits[1],'year':date_bits[0],'day_of_week':start_day}, upsert=True)
except:
rp_html += '<td align="right">$0.00</td>'
try:
rp_html += '<td align="right">%s</td>' % rev_providers[network][start_date]['count']
except:
rp_html += '<td align="right">0</td>'
# show the end date details
try:
rp_html += '<td align="right">%s</td>' % locale.currency(rev_providers[network][end_date]['revenue'], grouping=True)
# log in the revenue provider history
date_bits = end_date.split('-')
db.revenue_provider_history.update({'network':network, 'date':end_date}, {'network':network,'date':end_date,'revenue':rev_providers[network][end_date]['revenue'],'count':rev_providers[network][end_date]['count'],'day':date_bits[2],'month':date_bits[1],'year':date_bits[0],'day_of_week':end_day}, upsert=True)
except:
rp_html += '<td align="right">$0.00</td>'
try:
rp_html += '<td align="right">%s</td>' % rev_providers[network][end_date]['count']
except:
rp_html += '<td align="right">0</td>'
# get the MTD total
mtd_results = db.revenue_provider_history.aggregate([
{'$match':{'network':network,'year':year,'month':month}},
{'$group':{'_id':{'network':'$network'}, 'total':{'$sum':'$revenue'}}}
])
for res in mtd_results:
mtd_rec = res
try:
rp_html += '<td align="right">%s</td>' % locale.currency(mtd_rec['total'], grouping=True)
mtd_sum += mtd_rec['total']
except:
rp_html += '<td>N/A</td>'
# get the PMTD total
mtd_results = db.revenue_provider_history.aggregate([
{'$match':{'network':network,'year':prev_year,'month':prev_month}},
{'$group':{'_id':{'network':'$network'}, 'total':{'$sum':'$revenue'}}}
])
for res in mtd_results:
mtd_rec = res
try:
rp_html += '<td align="right">%s</td>' % locale.currency(mtd_rec['total'], grouping=True)
pmtd_sum += mtd_rec['total']
except:
rp_html += '<td>N/A</td>'
rp_html += '</tr>'
rp_html += '<tr>'
rp_html += '<td> </td>'
rp_html += '<td align="right">%s</td>' % locale.currency(provider_totals[start_date]['revenue'], grouping=True)
rp_html += '<td align="right">%s</td>' % provider_totals[start_date]['count']
rp_html += '<td align="right">%s</td>' % locale.currency(provider_totals[end_date]['revenue'], grouping=True)
rp_html += '<td align="right">%s</td>' % provider_totals[end_date]['count']
rp_html += '<td align="right">%s</td>' % locale.currency(mtd_sum, grouping=True)
rp_html += '<td align="right">%s</td>' % locale.currency(pmtd_sum, grouping=True)
rp_html += '</tr>'
rp_html += '</table>'
logging.info("complete")
return rp_html
def on_timeout():
logging.info("timeout")
Here is the post method/handler
#needs work not truely async
class ProviderTable(app.basic.BaseHandler):
@tornado.web.authenticated
@coroutine
def post(self):
seconds_to_wait = 300
deadline = time.time() + seconds_to_wait
IOLoop.instance().add_timeout(deadline, generate_provider_rev_table.on_timeout)
notes = {}
logging.info('running table script')
result = yield generate_provider_rev_table.table()
if result != '':
notes['created'] = datetime.utcnow()
notes['slug'] = ''
notes['value'] = ''
notes['logged_by'] = 'systems'
notes['message'] = 'Revenue table generated at {0}'.format(datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"))
audit_notesdb.log_audit_note(notes)
self.api_response(result)
So this is what I'm trying to do.. I want to run the driver() method, which I'm running using the table() method, which I assume is async... I then yield the result which should be the contents contained in the rp_html table from the driver method. When I hit the post method, the script runs but it's still blocking my thread. What I'm I doing wrong and how do I fix it?
Upvotes: 0
Views: 159
Reputation: 91
Your driver
function is blocking. It blocks because you're using mongo
lib and logging
module, which are synchronous. So, when you call
sites = list(db.sites.find())
or
logging.info('retrieving data for {0}'.format(date))
your loop actually is blocked waiting for the response.
So, logging
. There is a built-in logging in tornado
(see here), you need only to set your custom logging function in app settings, but it's also blocking. You may try using queues, e.g. logs are put into some queue, then some worker (async) puts them into stdout: every ioloop
iteration the worker will start (this way) and flush everything.
For mongo
non-blocking stuff use for example motor
library, like this:
from motor.motor_tornado import MotorClient
db = MotorClient().db
cursor = db.sites.find()
for document in (yield cursor.to_list(length=None)):
# do something with the current record
Upvotes: 3