VMGA
VMGA

Reputation: 1

How to create different consumer groups in the same faust app (using kafka and python)

I'm kind of new to faust, and I need my app to handle kakfa messages using different consumer groups. I need to have two different consumer groups in the same program, but since the consumer group for each kafka topic is the same as the faust app id, I need two apps running on the same program. My problem is that I don't know how to initialize the two apps in the command line. This doesn't work, for example: faust -A test_faust:app1,app2 worker This is an example of what I'm trying to do:

## Script test_faust.py
import faust

app1 = faust.App('consumer_group1', broker='kafka://localhost:9092', value_serializer='json', consumer_auto_offset_reset='earliest')
app2 = faust.App('consumer_group2', broker='kafka://localhost:9092', value_serializer='json', consumer_auto_offset_reset='earliest')

topic1 = app1.topic('topic1', value_type=str)
topic2 = app2.topic('topic2', value_type=str)

@app1.agent(topic1)
async def process1(stream):
    async for value in stream:
        print(f'App1: {value}')

@app2.agent(topic2)
async def process2(stream):
    async for value in stream:
        print(f'App2: {value}')

Any ideas how I can initialize both apps with the same command?Or alternatively, using the same app, how can i create a different consumer groups for each topic? Is there an option to do something like

topic1 = app.topic('topic1', value_type=str, consumer_id='consumer_group1')
topic2 = app.topic('topic2', value_type=str, consumer_id='consumer_group2')

Or any way to start the two apps in the script via the command line? I'm trying

faust -A test_faust:app1,app2 worker

but gives error

ValueError: Component 'app1,app2' of 'test_faust:app1,app2' is not a valid identifier

Thanks

Upvotes: 0

Views: 31

Answers (1)

VMGA
VMGA

Reputation: 1

I found a solution. The key is to pass the two apps to the worker:

import asyncio
import nest_asyncio
nest_asyncio.apply()

async def start_worker(worker: faust.Worker) -> None:
    await worker.start()

def manage_loop():
    loop = asyncio.get_event_loop_policy().get_event_loop()
    try:
        worker = faust.Worker(*[app1, app2], loop=loop)
        loop.run_until_complete(start_worker(worker))
        loop.run_forever()
    finally:
        worker.stop_and_shutdown()
        worker.close()

if __name__=='__main__':
    manage_loop()

And then to start the app you need to call

python test_faust.py worker

Upvotes: 0

Related Questions