Jason Choi
Jason Choi

Reputation: 143

Is it possible to reset offsets to a topic for a kafka consumer group in a kafka connector?

My kafka sink connector reads from multiple topics (configured with 10 tasks) and processes upwards of 300 records from all topics. Based on the information held in each record, the connector may perform certain operations.

Here is an example of the key:value pair in a trigger record:

"REPROCESS":"my-topic-1"

Upon reading this record, I would then need to reset the offsets of the topic 'my-topic-1' to 0 in each of its partitions.

I have read in many places that creating a new KafkaConsumer, subscribing to the topic's partitions, then calling the subscribe(...) method is the recommended way. For example,

public class MyTask extends SinkTask {

    @Override
    public void put(Collection<SinkRecord> records) {
        records.forEach(record -> {
        if (record.key().toString().equals("REPROCESS")) {
            reprocessTopicRecords(record);
        } else {
            // do something else
        }
        });
    }
    private void reprocessTopicRecords(SinkRecord record) {
        KafkaConsumer<JsonNode, JsonNode> reprocessorConsumer = 
            new KafkaConsumer<>(reprocessorProps, deserializer, deserializer);
        reprocessorConsumer.subscribe(Arrays.asList(record.value().toString()),
            new ConsumerRebalanceListener() {
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 
                    // do offset reset here
                }
            }
        );
    }
}

However, the above strategy does not work for my case because: 1. It depends on a group rebalance taking place (does not always happen) 2. 'partitions' passed to the onPartitionsAssigned method are dynamically assigned partitions, meaning these are only a subset to the full set of partitions that will need to have their offset reset. For example, this SinkTask will be assigned only 2 of the 8 partitions that hold the records for 'my-topic-1'.

I've also looked into using assign() but this is not compatible with the distributed consumer model (consumer groups) in the SinkConnector/SinkTask implementation.

I am aware that the kafka command line tool kafka-consumer-groups can do exactly what I want (I think): https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b

To summarize, I want to reset the offsets of all partitions for a given topic using Java APIs and let the Sink Connector pick up the offset changes and continue to do what it has been doing (processing records).

Thanks in advance.

Upvotes: 8

Views: 26644

Answers (3)

OneCricketeer
OneCricketeer

Reputation: 191864

You're looking for the seek method. Either to an offset

consumer.seek(new TopicPartition("topic-name", partition), offset);

Or seekToBeginning

However, I feel like you'd be competing with the Connect Sink API's consumer group. In other words, assuming you setup the consumer with a separate group id, then you're essentially consuming records twice here from the source topic, once by Connect, and then your own consumer instance.

Unless you explicitly seek Connect's own consumer instance as well (which is not exposed), you'd be getting into a weird state. For example, your task only executes on new records to the topic, despite the fact your own consumer would be looking at an old offset, or you'd still be getting even newer events while still processing old ones.

Also, eventually you might get a reprocess event at the very beginning of the topic due to retention policies, expiring old records, for example, causing your consumer to not progress at all and constantly rebalancing its group by seeking to the beginning.

Upvotes: 3

Daniel Deng
Daniel Deng

Reputation: 470

We had to do a very similar offset resetting exercise.

KafkaConsumer.seek() combined with KafkaConsumer.commitSync() worked well.

There is another option that is worth mentioning, if you are dealing with lots of topics and partitions (javadoc):

AdminClient.alterConsumerGroupOffsets(
  String groupId, 
  Map<TopicPartition,OffsetAndMetadata> offsets
)

We were lucky because we had the luxury to stop the Kafka Connect instance for a while, so there's no consumer group competing.

Upvotes: 2

Jason Choi
Jason Choi

Reputation: 143

I was able to achieve resetting offsets for a kafka connect consumer group by using a series of Confluent's kafka-rest-proxy APIs: https://docs.confluent.io/current/kafka-rest/api.html

This implementation no longer requires the 'trigger record' approach firs described in the original post and is purely Rest API based.

  1. Temporarily delete the kafka connector (this deletes the connector's consumers and )

  2. Create a consumer instance for the same consumer group ("connect-")

  3. Have the instance subscribe to the requested topic you want to reset

  4. Do a dummy poll ('subscribe' is evaluated lazily')

  5. Reset consumer group topic offsets for specified topic

  6. Do a dummy poll ('seek' is evaluated lazily') Commit the current offset state (in the proxy) for the consumer

  7. Re-create kafka connector (with same connector name) - after re-balancing, consumers will join the group and read the last committed offset (starting from 0)

  8. Delete the temporary consumer instance

If you are able to use the CLI, Steps 2-6 can be replaced with:

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

As for those of you trying to do this in the kafka connector code through native Java APIs, you're out of luck :-(

Upvotes: 5

Related Questions