Reputation: 1
I'm kind of new to faust, and I need my app to handle kakfa messages using different consumer groups. I need to have two different consumer groups in the same program, but since the consumer group for each kafka topic is the same as the faust app id, I need two apps running on the same program. My problem is that I don't know how to initialize the two apps in the command line. This doesn't work, for example: faust -A test_faust:app1,app2 worker This is an example of what I'm trying to do:
## Script test_faust.py
import faust
app1 = faust.App('consumer_group1', broker='kafka://localhost:9092', value_serializer='json', consumer_auto_offset_reset='earliest')
app2 = faust.App('consumer_group2', broker='kafka://localhost:9092', value_serializer='json', consumer_auto_offset_reset='earliest')
topic1 = app1.topic('topic1', value_type=str)
topic2 = app2.topic('topic2', value_type=str)
@app1.agent(topic1)
async def process1(stream):
async for value in stream:
print(f'App1: {value}')
@app2.agent(topic2)
async def process2(stream):
async for value in stream:
print(f'App2: {value}')
Any ideas how I can initialize both apps with the same command?Or alternatively, using the same app, how can i create a different consumer groups for each topic? Is there an option to do something like
topic1 = app.topic('topic1', value_type=str, consumer_id='consumer_group1')
topic2 = app.topic('topic2', value_type=str, consumer_id='consumer_group2')
Or any way to start the two apps in the script via the command line? I'm trying
faust -A test_faust:app1,app2 worker
but gives error
ValueError: Component 'app1,app2' of 'test_faust:app1,app2' is not a valid identifier
Thanks
Upvotes: 0
Views: 31
Reputation: 1
I found a solution. The key is to pass the two apps to the worker:
import asyncio
import nest_asyncio
nest_asyncio.apply()
async def start_worker(worker: faust.Worker) -> None:
await worker.start()
def manage_loop():
loop = asyncio.get_event_loop_policy().get_event_loop()
try:
worker = faust.Worker(*[app1, app2], loop=loop)
loop.run_until_complete(start_worker(worker))
loop.run_forever()
finally:
worker.stop_and_shutdown()
worker.close()
if __name__=='__main__':
manage_loop()
And then to start the app you need to call
python test_faust.py worker
Upvotes: 0