Reputation: 65
One of the async function returns the async generator object. I added loop.run_until_complete(func()), but still, it throws the error as "TypeError: A Future, a coroutine or an awaitable is required". Below is the code. I'm trying to fetch the records from Neo4j asynchronously. I got the async "Neo4j class from a GitHub. I'm new to this async concept.
from concurrent import futures
import neo4j
from neo4j import GraphDatabase, basic_auth
import time
import traceback
import asyncio
RETRY_WAITS = [0, 1, 4] # How long to wait after each successive failure.
class Neo4j:
"""Neo4j database API."""
def __init__(self, config, loop):
self.config = config
self.loop = loop
self.executor = futures.ThreadPoolExecutor(max_workers=30)
for retry_wait in RETRY_WAITS:
try:
self.init_driver()
break
except:
if retry_wait == RETRY_WAITS[-1]:
raise
else:
print('WARNING: retrying to Init DB; err:')
traceback.print_exc()
time.sleep(retry_wait) # wait for 0, 1, 3... seconds.
def init_driver(self):
auth = basic_auth(self.config['user'], self.config['pass'])
self.driver = GraphDatabase.driver(self.config['url'], auth=auth)
async def afetch_start(self, query):
session = self.driver.session(access_mode=neo4j.READ_ACCESS)
def run():
return session.run(query).records()
return session, await self.loop.run_in_executor(self.executor, run)
async def afetch_iterate(self, session, iter):
def iterate():
try:
return next(iter)
except StopIteration:
return None
while True:
res = await self.loop.run_in_executor(self.executor, iterate)
if res is None:
return
else:
yield dict(res)
async def afetch(self, query):
for retry_wait in RETRY_WAITS:
try:
session, iter = await self.afetch_start(query)
break
except (BrokenPipeError, neo4j.exceptions.ServiceUnavailable) as e:
if retry_wait == RETRY_WAITS[-1]:
raise
else:
await asyncio.sleep(retry_wait)
await self.loop.run_in_executor(self.executor, self.init_driver)
async for x in self.afetch_iterate(session, iter):
yield x
await self.loop.run_in_executor(self.executor, session.close)
async def afetch_one(self, query):
async for i in self.afetch(query):
return i
return None
async def aexec(self, query):
async for i in self.afetch(query):
pass
return
config={'url':"bolt://localhost",'user':'neo4j','pass':'pwd'}
loop=asyncio.get_event_loop()
n=Neo4j(config,loop)
loop.run_until_complete(n.afetch("MATCH(p:Person)-[:Acted_in]->(mv:Movies) RETURN p.name as actors"))
loop.close()
--EDIT
I have modified the code to work properly. The query returns 218K rows and it takes 5 minutes to extract the complete list and the same async operation in C# completes in just 2 sec. Looks like the above code still doesnt go in async
Upvotes: 1
Views: 4555
Reputation: 39546
It's very hard to tell what exactly happens without reproducible example, but I'll take a guess. You probably pass async generator object in a loop, you shouldn't do it. A way to work with async generators is to use async for
. Here's example:
import asyncio
async def func(): # async generator
yield 1
yield 2
yield 3
async def main():
async for i in func(): # get values from async generator
print(i)
asyncio.run(main()) # can be used instead of loop.run_until_complete(main())
Upvotes: 1