Proper way to resend messages to topic

I load messages from kafka topic to database. Loading to database can fail. Also I do not want to lose unsent messages.

App code:

import faust

app = faust.App('App', broker='kafka://localhost:9092')

source_topic = app.topic('source_topic')
failed_channel = app.channel()  # channel for unsent messages


@app.agent(source_topic)
async def process(stream):
    async for batch in stream.take(100_000, within=60):
        # here we have not info about partitions and keys
        # to reuse them when resending if sending failed
        try:
            pass  # send to database.  can fail
        except ConnectionError:
            for record in batch:
                # sending to channel is faster than sending to topic
                await failed_channel.send(value=record)


@app.agent(failed_channel)
async def resend_failed(stream):
    async for unsent_msg in stream:
        await source_topic.send(value=unsent_msg)

Maybe there is more standart way to handle such situations? Adding app.topic('source_topic', acks=False) works only after restarting app.

Upvotes: 1

Views: 1029

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32090

I load messages from kafka topic to database

Maybe there is more standart way to handle such situations

Yes - it's called Kafka Connect :-)

The standard pattern is to do any processing on your data and write it [back to] Kafka topics. Then you use the Kafka topic as a source for a Kafka Connect sink connector, in this case the Kafka Connect JDBC Sink connector.

Kafka Connect is part of Apache Kafka, and handles restarts, scaleout, failures, etc etc.

See also Kafka Connect in Action: JDBC Sink

Upvotes: 1

Related Questions