Reputation: 31
I have set up an experimental Kafka environment with 3 brokers and a topic with 3 partitions.I have a producer and a consumer. I want to modify offsets of a partition for a specific consumer. I read in the kafka documentation that consumer commit/fetch API in kafka can commit a specific offset or fetch the latest offset read by a consumer. here is the link for the API:
I have used the code from the page below to write my code in order to fetch offsets from a specific consumer. However the fetch API returns the value of "-1" for the requested offset. here is the example code:
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
I also read in the first link that "if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1."
However I have produced some messages and my consumer have consumed the messages and output the offset for each read message.
I would be really gratefull if anyone can help with this. I want to know which part of my code is wrong. Or maybe there is something wrong with the API. Please don't hesitate to put any useful comments. My code is exactly like the code in the link I provided. However If you need to see my code please tell me to put it here.
The kafka version is 0.10.2.0
The config of my Kafka is :
Broker 1: port 9093
Broker 2: port 9094
Broker 3: port 9095
Topic: "testpic3"
......................
Consumer Config:
props.put("group.id", "test");
props.put("client.id", "MyConsumer");
................
here is my code:
public class KafkaOffsetManage {
public static void main(String[] args) {
BlockingChannel channel = new BlockingChannel("localhost", 9095,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
final String MY_GROUP = "test";
final String MY_CLIENTID = "MyConsumer";
int correlationId = 0;
final TopicAndPartition testPartition0 = new TopicAndPartition("testpic3",0);
final TopicAndPartition testPartition1 = new TopicAndPartition("testpic3",1);
final TopicAndPartition testPartition2 = new TopicAndPartition("testpic3",2);
channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
System.out.println("+++++++++++++++++++++++++++");
System.out.println(metadataResponse.errorCode());
if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
Broker offsetManager = metadataResponse.coordinator();
// if the coordinator is different, from the above channel's host then reconnect
channel.disconnect();
channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
System.out.println("Connected to Offset Manager");
System.out.println(offsetManager.host() + ", Port:"+ offsetManager.port());
} else {
// retry (after backoff)
}
// How to fetch offsets
List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
partitions.add(testPartition0);
//partitions.add(testPartition1);
OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
MY_GROUP,
partitions,
(short) 2 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
correlationId,
MY_CLIENTID);
try {
channel.send(fetchRequest.underlying());
OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);
short offsetFetchErrorCode = result.error();
if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
channel.disconnect();
// Go to step 1 and retry the offset fetch
} else if (offsetFetchErrorCode == ErrorMapping.OffsetsLoadInProgressCode()) {
// retry the offset fetch (after backoff)
} else {
long retrievedOffset = result.offset();
String retrievedMetadata = result.metadata();
System.out.println("The retrieved offset is:"+ Long.toString(retrievedOffset));
System.out.println(retrievedMetadata);
System.out.println(result.toString());
}
}
catch (Exception e) {
channel.disconnect();
// Go to step 1 and then retry offset fetch after backoff
}
}
}
The output of the code is here:
+++++++++++++++++++++++++++
0
Connected to Offset Manager
user-virtual-machine, Port:9093
------------------------
The retrieved offset is:-1
OffsetMetadataAndError[-1,,3]
Process finished with exit code 0
One strange thing is about Kafka dependencies. When I add this dependency, my code does not recognize some classes in the program:
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.0</version>
classes "ConsumerMetadataRequest" and "ConsumerMetadataResponse" are not recognized.
So I added this dependency instead:
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
Thank you,
Upvotes: 3
Views: 8812
Reputation: 79
This situation happens because of offsets expiration. There are two parameters in kafka controlling this behaviour. First of all it is the "retention.ms" setting of "__consumer_offsets" topic. It should be equal to -1 to disable expiration of records inside that topic. I assume using of kafka version 1.1.x. Check topic configuration using command:
$ ./kafka-configs.sh --entity-type topics \
--entity-name __consumer_offsets \
--zookeeper localhost:2181 \
--describe
Configs for topic '__consumer_offsets' are compression.type=producer,cleanup.policy=compact,min.insync.replicas=2,segment.bytes=104857600,retention.ms=-1,unclean.leader.election.enable=false
If configuration settings are not met, change them using command:
$ ./kafka-configs.sh --entity-type topics \
--entity-name __consumer_offsets \
--zookeeper localhost:2181 \
--alter \
--add-config retention.ms=-1
Suppose retention policy is set up, next it is necessary to check if there is any commited message in the topic. By default kafka does not allow to read internal topics. To change this behaviour create a file with consumer settings:
$ echo exclude.internal.topics=false > consumer.properties
After that read "__consumer_offsets" topic using command:
$ ./kafka-console-consumer.sh --consumer.config consumer.properties \
--from-beginning \
--topic __consumer_offsets \
--zookeeper localhost:2181 \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
If there is something in the topic the output will look like this:
[test_client,Test.Purposes,2]::[OffsetMetadata[13,NO_METADATA],CommitTime 1534165245681,ExpirationTime 1534251645681]
[test_client,Test.Purposes,0]::[OffsetMetadata[14,NO_METADATA],CommitTime 1534165245776,ExpirationTime 1534251645776]
[test_client,Test.Purposes,1]::[OffsetMetadata[8,NO_METADATA],CommitTime 1534165690946,ExpirationTime 1534252090946]
Here the ExpirationTime value makes sense. The Group Coordinator will read only unexpired records at the moment of offsets loading, i.e. now() < ExpirationTime, and these values are returned to client's offset fetch request.
The ExpirationTime is calculated when client commits offsets using formula:
ExpirationTime = CommitTime + offsets.retention.minutes
The offsets.retention.minutes is a broker-level setting and by default it equals to 1440 (24 hours). Decoding CommitTime and ExpirationTime from the command output, we see
$ date -d @1534165245
Mon Aug 13 16:00:45 UTC 2018
$ date -d @1534251645
Tue Aug 14 16:00:45 UTC 2018
which are exactly 24 hours.
So the solution for incorrect offsets problem is increasing the "offsets.retention.minutes" setting keeping in mind, that this affects on broker memory usage, when there are a lot of dead consumer groups in a system, and also periodically commit unchanged offsets to increase expiration time.
Upvotes: 2
Reputation: 336
I assume you've added
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.0</version>
</dependency>
as your dependency. This is Kafka itself. What you need for consuming/producing to Kafka 0.10.2 is:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
For consuming (and manipulating offset of given consumer) use class KafkaConsumer
it has detailed javadoc and is more convenient than Committing and fetching consumer offsets in Kafka.
Apart of that, if you still want to use code from example you linked, problem that you have may be in:
List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
partitions.add(testPartition0);
You add only one partition and there is possibility that on this partition there are no messages (you have 3 partition so messages you send may have went on other two). In Kafka each partition is separate and consumer group have different offset for each partition.
Upvotes: -1