Reputation: 267
I'm trying to implement a faust agent using take to process multiple messages at the same time.
app = faust.App('vectors-stream')
vector_topic = app.topic('vector', value_type=VectorRecord)
@app.agent(vector_topic)
async def process_entities(stream: faust.streams.Stream):
async for records in stream.take(max_=500, within=timedelta(seconds=5)):
# yield await update_company_partition(records=records)
yield print(len(records))
now, I'm trying to write a test, just to see that the behaviour is as I except.
import asyncio
import random
from unittest import IsolatedAsyncioTestCase
import pytest
from app.data.kafka.consumer import process_entities, VectorRecord, app
class TestKafkaStream(IsolatedAsyncioTestCase):
async def asyncSetUp(self) -> None:
app.conf.store = 'memory://'
def generate_vector(self, dim: int):
return [random.uniform(0.001, 1) for i in range(dim)]
@pytest.mark.asyncio()
async def test_vectors_kafka_stream(self):
async with process_entities.test_context() as agent:
companies = ['se', 'spotify']
for company in companies:
for i in range(10):
_type = random.choice(['JobCascaded', 'UserHistory'])
msg = VectorRecord(company_slug=company, operation='upsert',
vector=self.generate_vector(16), vector_type=_type, id=i)
await agent.put(msg)
But when I put a break point on the yield print(len(records))
row, it prints that the len of records
is just 1.
Upvotes: 2
Views: 718