bigbounty
bigbounty

Reputation: 17368

Acknowledgement Kafka Producer Apache Beam

How do I get the records where an acknowledgement was received in apache beam KafkaIO?

Basically I want all the records where I didn't get any acknowledgement to go to a bigquery table so that I can retry sometime later. I used the following code snippet from the docs

    .apply(KafkaIO.<Long, String>read()
       .withBootstrapServers("broker_1:9092,broker_2:9092")
       .withTopic("my_topic")  // use withTopics(List<String>) to read from multiple topics.
       .withKeyDeserializer(LongDeserializer.class)
       .withValueDeserializer(StringDeserializer.class)

       // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>

       // Rest of the settings are optional :

       // you can further customize KafkaConsumer used to read the records by adding more
       // settings for ConsumerConfig. e.g :
       .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))

       // set event times and watermark based on LogAppendTime. To provide a custom
       // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
       .withLogAppendTime()

       // restrict reader to committed messages on Kafka (see method documentation).
       .withReadCommitted()

       // offset consumed by the pipeline can be committed back.
       .commitOffsetsInFinalize()

       // finally, if you don't need Kafka metadata, you can drop it.g
       .withoutMetadata() // PCollection<KV<Long, String>>
    )
    .apply(Values.<String>create()) // PCollection<String>

Upvotes: 0

Views: 557

Answers (1)

Alex Amato
Alex Amato

Reputation: 1725

By Default Beam IOs are designed to keep attempting to write/read/process elements until . (Batch pipelines will fail after repeated errors)

What you are referring to is usually called a Dead Letter Queue, to take the failed records and add them to a PCollection, Pubsub topic, queuing service, etc. This is often desire-able as it allows a streaming pipeline to make progress (not block), when errors writing some records are encountered, but allowing the onces which succeed to be written.

Unfortunately, unless I am mistaken there is no dead letter queue implemented in Kafka IO. It may be possible to modify KafkaIO to support this. There was some discussion on the beam mailing list with some ideas proposed to implement this, which might have some ideas.

I suspect it may be possible to add this to KafkaWriter, catching the records that failed and outputting them to another PCollection. If you choose to implement this, please also contact the beam community mailing list, if you would like help merging it into master, they will be able to help make sure the change covers necessary requirements so that it can be merged and makes sense as a whole for beam.

Your pipeline can then write those elsewhere (i.e. a different source). Of course, if that secondary source simultaneously has an outage/issue, you would need another DLQ.

Upvotes: 1

Related Questions