Reputation: 9465
Example (question follows):
import asyncio
import typing as t
from aiokafka import AIOKafkaConsumer
class KafkaSimpleClient:
...
async def receive(self, topic: str) -> ???:
bootstrap_servers = ','.join(
'{}:{}'.format(host, port)
for host, port in self._bootstrap_servers
)
consumer = AIOKafkaConsumer(
loop=asyncio.get_event_loop(),
bootstrap_servers=bootstrap_servers,
metadata_max_age_ms=5000,
)
consumer.subscribe(pattern=topic)
await consumer.start()
return consumer
Now, I'm struggling with the return type of receive
(it returns something that can be iterated over with async for x in y
. What is it? Is it an awaitable iterator? Is it an iterator over awaitables? Perhaps something else entirely?
??? = t.Awaitable[t.Iterator]
??? = t.Iterator[t.Awaitable]
??? = (Something else)
Upvotes: 3
Views: 2038
Reputation: 171
As of Python 3.9, typing.AsyncIterable
and typing.AsyncIterator
are deprecated aliases to their abstract base class (ABC) counterparts in collections.abc
. The ABCs can also be subscripted with the type of the iterator's items, which allows the type of those items to be inferred when iterating.
From the docs for typing.AsyncIterable
:
Deprecated since version 3.9:
collections.abc.AsyncIterable
now supports[]
.
Thus, using the new ABC types and subscripts, the return type of receive
becomes:
import collections.abc as abc
from aiokafka.structs import ConsumerRecord
async def receive(self, topic: str) -> abc.AsyncIterable[ConsumerRecord]:
...
# or
async def receive(self, topic: str) -> abc.AsyncIterator[ConsumerRecord]:
...
(Where ConsumerRecord
is the type of items supplied by the AIOKafkaConsumer
iterator, as documented.)
However, the same docs for AIOKafkaConsumer
tell us:
Failure to call
AIOKafkaConsumer.stop()
after consumer use will leave background tasks running.
Thus, in this specific example, the return type of receive
should probably not abstract the concrete type, since the consumer must be aware that the return value is a resource that must be cleaned up and not just any iterator.
Upvotes: 0
Reputation: 39546
Source code of typing
module leaves no doubts.
async def receive(self, topic: str) -> t.AsyncIterable:
or
async def receive(self, topic: str) -> t.AsyncIterator:
if you sure it'll be strictly iterator.
Upvotes: 3