Reputation: 8166
New to async.
I had a class called PGServer that you could iterate to get a list of database names:
class PGServer:
# ...connection code omitted...
def _databases( self ):
curr = self.connection().cursor()
curr.execute( "SELECT datname FROM pg_database ORDER BY datname" )
return [ row[ 0 ] for row in curr.fetchall() ]
def __iter__( self ):
return iter( self._databases() )
dbs = PGServer( ...connection params... )
for db in dbs:
print( db )
It worked fine with psycopg. Then I switched to asyncpg:
class PGServer:
# ...connection code omitted...
async def _databases( self ):
conn = await self.connection()
rows = await conn.fetch( "SELECT datname FROM pg_database ORDER BY datname" )
return [ row[ 0 ] for row in rows ]
dbs = PGServer( ...connection params... )
for db in await dbs._databases():
print( db )
It works when I invoke _databases()
directly, but how to I get __iter__
working again? I can't make it async because that violates the protocol. I tried implementing __aiter__
instead, but couldn't figure out how to make that work.
Some implementations that I tried:
async def __aiter__( self ):
#return self._databases()
#return await self._databases()
#return aiter( self._databases() )
return aiter( await self._databases() )
Those all generated the following error:
TypeError: 'async for' received an object from __aiter__ that does not implement __anext__: coroutine
I just created an implementation that seems to work:
async def __aiter__( self ):
for db in await self._databases():
yield name
I don't know if that's optimal or idiomatic, though.
Unless someone can come up with something better, I'm just going to give up on having an __iter__
and saying for db in dbs:
, and instead just be more explicit:
for db in await dbs.databases():
...
(I dropped the underscore because in this new context databases()
is now the public API.)
Upvotes: 1
Views: 84
Reputation: 726
__aiter__
and __anext__
This won't solve the underlying problem (i.e., the code not being properly asynchronous) but it should make it viable to mixit with an asynchronous interface:
The problem here is that you're trying to wrap an asynchronous-defined (but not properly async) iterator (the newly defined async _databases
) with a synchronous method __iter__
. As you've stated, the solution is to implement the asynchronous counterpart of that method.
__aiter__
should return the asynchronous iterator itself, so, as your previous code simply forwards _databases
as an iterator:
...
class PGServer:
def __init__(self, *args,**kwargs):
# other __init__ stuff
self.results = None #define variable to host the synchronous results
async def _databases( self ):
conn = await self.connection()
rows = await conn.fetch( "SELECT datname FROM pg_database ORDER BY datname" )
return [ row[ 0 ] for row in rows ]
def __aiter__( self ): #return
return self
async def __anext__(self):
if self.results == None: #if not already set
self.results = await self._databases()
elif len(self.results) == 0:
self.results = None #on finish reset to None
raise StopAsyncIteration
return self.results.pop(0)
...
should work fine.
To make an asynchronous iterator in Python you need to define both __aiter__
and __anext__
.
__aiter__
is called when you use the async for loop
, and it shuould return the iterator itself
So if PGServer
is the actual iterator it should look like:
...
def __aiter__( self ):
return self
...
But lets look a it more closely: does it look like it's really iterating asynchronously? or is it mearly being called asynchronously
?
As it is, the code you've provided only awaits
the fecth
method for de db interface you're using. So all results from the query are return at once as a SYNCHRONOUS iterator, the asynchronous part is only the wait for the db to resolve.
As mentioned, the __aiter__
method only forwards the iterator itself, for the iteratior to yield asynchornously you need to define a __anext__
method. That will be called each time a new element of the iterator is requested. It should return the next element of the iterator, or raise the StopAsyncIteration if the iteration has finished.
So, to use the _databases
as an asyc iterator in this case you'd need to first define some method that constructs a properly async iterator/generator (i.e., one that returns or yield the values requested asynchronously) and then
define a __anext__
somewhat like:
...
async def __anext__(self):
if ITERATOR_IS_EXHAUSTED:
raise StopAsyncIteration
#pop item from iterator/generator and return/yield it
...
The solution you've arrived at works because you've made a properly async generator from the synchonous iterator (i.e., the list) that _databases returns. But te iterator itself is still synchronous. I'd recommend you check:
Upvotes: 1