wvxvw
wvxvw

Reputation: 9465

What is the type of asynchronous iterator?

Example (question follows):

import asyncio
import typing as t

from aiokafka import AIOKafkaConsumer


class KafkaSimpleClient:

    ...

    async def receive(self, topic: str) -> ???:
        bootstrap_servers = ','.join(
            '{}:{}'.format(host, port)
            for host, port in self._bootstrap_servers
        )
        consumer = AIOKafkaConsumer(
            loop=asyncio.get_event_loop(),
            bootstrap_servers=bootstrap_servers,
            metadata_max_age_ms=5000,
        )
        consumer.subscribe(pattern=topic)
        await consumer.start()
        return consumer

Now, I'm struggling with the return type of receive (it returns something that can be iterated over with async for x in y. What is it? Is it an awaitable iterator? Is it an iterator over awaitables? Perhaps something else entirely?

Upvotes: 3

Views: 2038

Answers (2)

just-max
just-max

Reputation: 171

As of Python 3.9, typing.AsyncIterable and typing.AsyncIterator are deprecated aliases to their abstract base class (ABC) counterparts in collections.abc. The ABCs can also be subscripted with the type of the iterator's items, which allows the type of those items to be inferred when iterating.

From the docs for typing.AsyncIterable:

Deprecated since version 3.9: collections.abc.AsyncIterable now supports [].

Thus, using the new ABC types and subscripts, the return type of receive becomes:

import collections.abc as abc
from aiokafka.structs import ConsumerRecord

async def receive(self, topic: str) -> abc.AsyncIterable[ConsumerRecord]:
    ...

# or

async def receive(self, topic: str) -> abc.AsyncIterator[ConsumerRecord]:
    ...

(Where ConsumerRecord is the type of items supplied by the AIOKafkaConsumer iterator, as documented.)


However, the same docs for AIOKafkaConsumer tell us:

Failure to call AIOKafkaConsumer.stop() after consumer use will leave background tasks running.

Thus, in this specific example, the return type of receive should probably not abstract the concrete type, since the consumer must be aware that the return value is a resource that must be cleaned up and not just any iterator.

Upvotes: 0

Mikhail Gerasimov
Mikhail Gerasimov

Reputation: 39546

Source code of typing module leaves no doubts.

async def receive(self, topic: str) -> t.AsyncIterable:

or

async def receive(self, topic: str) -> t.AsyncIterator:

if you sure it'll be strictly iterator.

Upvotes: 3

Related Questions