Reputation: 3
I want to be able to either consume messages from a Kafka broker or a local file with data in it. How do I do this with Faust without writing a very similar function without Faust that just uses a simple for loop to iterate over messages?
Or is it better to just avoid Faust in this case? Still learning all of this, not sure if this should be even done.
@app.agent(input_topic)
async def myagent(messages):
async for item in stream:
result = do_something(item)
await output_topic.send(result)
How do I modify this code block to be able to accept messages from a given file/list as well (depending on a config variable that will be set)? Or to send the messages from a file/list to the input topic?
Upvotes: 0
Views: 309
Reputation: 31
If you need stateful processing, bytewax is a good option instead of Faust because of the flexibility of inputs.
Upvotes: 0
Reputation: 191738
As you said, you don't need Faust. (Plus, it can't read files).
Use kafka-python
, aiokafka
, etc. Use open('file')
like you would with any other file, and read it, then produce data from it
Upvotes: 0