Reputation: 1745
I have 4 functions that basically build queries and execute them. I want to make them run simultaneously using asyncio. My implementation of asyncio seems correct as non mongodb tasks run as they should( example asyncio.sleep()). Here is the code:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [
service.async_get_associate_opportunity_count_by_user(me, criteria),
service.get_new_associate_opportunity_count_by_user(me, criteria),
service.async_get_associate_favorites_count(me, criteria=dict()),
service.get_group_matched_opportunities_count_by_user(me, criteria)
]
available, new, favorites, group_matched = loop.run_until_complete(asyncio.gather(*tasks))
stats['opportunities']['available'] = available
stats['opportunities']['new'] = new
stats['opportunities']['favorites'] = favorites
stats['opportunities']['group_matched'] = group_matched
loop.close()
# functions written in other file
@asyncio.coroutine
def async_get_ass(self, user, criteria=None, **kwargs):
start_time = time.time()
query = **query that gets built from some other functions**
opportunities = Opportunity.objects(query).count()
run_time = time.time() - start_time
print("runtime of available: {}".format(run_time))
yield from asyncio.sleep(2)
return opportunities
@asyncio.coroutine
def get_new_associate_opportunity_count_by_user(self, user, criteria=None, **kwargs):
start_time = time.time()
query = **query that gets built from some other functions**
opportunities = Opportunity.objects(query).count()
run_time = time.time() - start_time
print("runtime of new: {}".format(run_time))
yield from asyncio.sleep(2)
return opportunities
@asyncio.coroutine
def async_get_associate_favorites_count(self, user, criteria={}, **kwargs):
start_time = time.time()
query = **query that gets built from some other functions**
favorites = Opportunity.objects(query).count()
run_time = time.time() - start_time
print("runtime of favorites: {}".format(run_time))
yield from asyncio.sleep(2)
return favorites
@asyncio.coroutine
def get_group_matched_opportunities_count_by_user(self, user, criteria=None, **kwargs):
start_time = time.time()
query = **query that gets built from some other functions**
opportunities = Opportunity.objects(query).count()
run_time = time.time() - start_time
print("runtime of group matched: {}".format(run_time))
yield from asyncio.sleep(2)
return opportunities
The yield from asyncio.sleep(2)
is just to show that the functions run asynchronously. Here is the output on the terminal:
runtime of group matched: 0.11431598663330078 runtime of favorites: 0.0029871463775634766 Timestamp function run time: 0.0004897117614746094 runtime of new: 0.15225648880004883 runtime of available: 0.13006806373596191 total run time: 2403.2700061798096 From my understanding, apart from the 2000ms that gets added to the total run time due to the sleep function, it shouldn't be more than 155-160ms as the max run time among all functions is this value.
I'm currently looking into motorengine(a port of mongoengine 0.9.0) that apparently enables asynchronous mongodb queries but I think I won't be able to use it since my models have been defined using mongoengine. Is there a workaround to this problem?
Upvotes: 0
Views: 1595
Reputation: 5173
The reason your queries aren't running in parallel is because whenever you run Opportunity.objects(query).count()
in your coroutines, the entire event loop blocks because those methods are doing blocking IO.
So you need a mongodb driver which can do async/non-blocking IO. You are on the correct path with trying to use motorengine
, but as far as I can tell it's written for the Tornado asynchronous framework. To get it to work with asyncio
you would have to hookup Tornado and asycnio
. See, http://tornado.readthedocs.org/en/latest/asyncio.html on how to do that.
Another option is to use asyncio-mongo
, but it doesn't have a mongoengine
compatibale ORM, so you might have to rewrite most of your code.
Upvotes: 3