yovel cohen
yovel cohen

Reputation: 267

Python Faust, how to use take to run on multiple values

I'm trying to implement a faust agent using take to process multiple messages at the same time.

app = faust.App('vectors-stream')
vector_topic = app.topic('vector', value_type=VectorRecord)


@app.agent(vector_topic)
async def process_entities(stream: faust.streams.Stream):
    async for records in stream.take(max_=500, within=timedelta(seconds=5)):
        # yield await update_company_partition(records=records)
        yield print(len(records))

now, I'm trying to write a test, just to see that the behaviour is as I except.

import asyncio
import random
from unittest import IsolatedAsyncioTestCase

import pytest

from app.data.kafka.consumer import process_entities, VectorRecord, app

class TestKafkaStream(IsolatedAsyncioTestCase):
    async def asyncSetUp(self) -> None:
        app.conf.store = 'memory://'

    def generate_vector(self, dim: int):
        return [random.uniform(0.001, 1) for i in range(dim)]

    @pytest.mark.asyncio()
    async def test_vectors_kafka_stream(self):
        async with process_entities.test_context() as agent:
            companies = ['se', 'spotify']
            for company in companies:
                for i in range(10):
                    _type = random.choice(['JobCascaded', 'UserHistory'])
                    msg = VectorRecord(company_slug=company, operation='upsert',
                                       vector=self.generate_vector(16), vector_type=_type, id=i)

                    await agent.put(msg)

But when I put a break point on the yield print(len(records)) row, it prints that the len of records is just 1.

Upvotes: 2

Views: 718

Answers (0)

Related Questions