Reputation: 5163
Using the new Kafka Java consumer api, I run a single consumer to consume messages. When all available messages are consumed, I kill it with kill -15
.
Now I would like to reset the offsets to start. I would like to avoid to just use a different consumer group. What I tried is the following sequence of calls, using the same group as the consumer that just had finished reading the data.
assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));
I thought I had got this working in a test, but now I always just get:
ERROR internals.ConsumerCoordinator: Error UNKNOWN_MEMBER_ID occurred while committing offsets for group queue
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
Is it in principle wrong to combine assign
with commitSync
, possibly because only subscribe
and commitSync
go together? The docs only say that assign
does not go along with subscribe
, but I thought this applies only in one consumer process. (In fact I was even hoping to run the offset-reset consumer while the other consumer is up, hoping that the other one might notice the offset change and start over again. But shutting it down first is fine too.)
Any ideas?
Upvotes: 3
Views: 4859
Reputation: 5163
Found the problem. The approach described in my question works well, given we respect the following conditions:
There may be no other consumer running with the targeted group.id
. Even if a consumer is subscribed only to other topics, this hinders committing topic offsets after calling assign()
instead of subscribe()
.
After the last other consumer has stopped, it takes 30 seconds (I think it is group.max.session.timeout.ms
) before the operation can succeed. The indicative log message from kafka is
Group X generation Y is dead and removed
Once this appears in the log, the sequence
assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));
can succeed.
Upvotes: 3
Reputation: 5989
Why even commit offsets in the first place?
Set enable.auto.commit
to false
in Properties
and don't commit it at all if you just re-read all messages on restart.
To reset offset you can use for example these methods:
public void seek(TopicPartition partition, long offset)
public void seekToBeginning(TopicPartition... partitions)
Upvotes: 0