aayush456
aayush456

Reputation: 23

Reading from a certain kafka offset in storm topology

I'm trying to get my storm topology to read from a certain kafka offset on restart.

If I understand correctly, I can do this by ignoreZkOffsets and setting the startOffsetTime, but this hasn't worked so far.

I've tried setting startOffsetTime to System.currentTimeMillis() - 60000L to start from a minute before, and setting it to the current offsets.

Upvotes: 0

Views: 785

Answers (2)

Qilong Su
Qilong Su

Reputation: 11

Your understanding of ignoreZkOffsets is partly right, setting this option as true will shortcut the offsets stored in zookeeper, but the startOffsetTime is not an arbitrary Unix timestamp. The initialization of the default startOffsetTime is as follows:

public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();

Kafka api only provide EarliestTime and LatestTime 2 methods for setting the initial offset,meaning that this approach doesn't work.

If you know the offset value, you can try to modify the offset value stored by storm-kafka at zookeeper. This value is stored at ZKPath of ${ZKRoot}/${ClientId}/${KafkaPartitionId}, where the ClientId is you specified in SpoutConfig, and KafkaPartitionId is usually 0 if you have only one partition.

Once you find this value, setting this value as you want and restart your topology, it will start reading from this offset. If this ZKPath doesn't exists, you can create this path manually.

One pitfall of this solution is that you should have a knowledge of your clientId, meaning that you cannot use random UUID as your clientIds as suggestted in the storm-starter demo.

Upvotes: 1

Bishnu
Bishnu

Reputation: 423

From kafka FAQ page "Kafka allows querying offsets of messages by time and it does so at segment granularity. The timestamp parameter is the unix timestamp and querying the offset by timestamp returns the latest possible offset of the message that is appended no later than the given timestamp. There are 2 special values of the timestamp - latest (from the end of the topic) and earliest (from the beginning of the topic). For any other value of the unix timestamp, Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes." https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?

If you know the offset number from where your application should start consuming messages then set that in zookeeper and turn ignoreZkOffsets to true.
FYI: zookeeper's node path would be the value you have specified for zkRoot property during spout configuration.

Hope this helps you.

Upvotes: 1

Related Questions