Franck Lefebure
Franck Lefebure

Reputation: 49

Kafka offset persistence in Zookeeper

I'm new to Storm/Kafka. I've been able to configure a basic working prototype with :

I'm able to produce message and to consume them from a storm topology.

I've a question regarding the kafka offsets persisting.

Initially I could'nt find the group used in the java client when using kafka utilities.

After some search I've read that storm-kafka stores this offset in zookeeper. If I have for configuration in storm-kafka :

Then I can retrieve the offset with the zookeeper zkCli.sh script.

get /my_root/my_group/partition_0
==> "topology":{},"offset":3148,..., "topic":"rawdatas"

My problem is I don't understand how and how often this offset is updated. In the storm bolt I'm sure to ack every tuple processed.

When the topology is started, and the tuples starting to be processed, I can see a little jump of the offset in Zookeeper (eg : some dozen) Then the offset don't move for long times.

Some time, I can see a larger jump (eg some thousand) but it seems random. Regarding the default kafka configuration, I understand that the offset should be updated every 2 seconds

// setting for how often to save the current kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;

Do I miss something ?

Franck

Upvotes: 0

Views: 1194

Answers (1)

Franck Lefebure
Franck Lefebure

Reputation: 49

After some diving in kafka-spout code I understand better my problem.

This post helped me too : http://www.developer.com/open/addressing-internal-apache-storm-buffers-overflowing.html

As usual, all is matter of configuration

In my sample topology, I have a kafka-spout which emit tuples to a simple monothreaded bolt which do "intense computation" that we can emulate with a simple Thread.sleep(1000)

Storm has a limit of how many tuples emitted can be alive in the topology (default 512) There is also a timeout between the spout emitting and the final ack for the tuple, which is 30 second par default.

What matters I think is :

  1. On the toplolgy start, the first 512 tuples are emitted and start to be processed by the bolt.
  2. After ~30 second, the spout starts to receive fail() on some tuples due to the timeout. Theses tuples are added to another storm queue to be replayed.
  3. The default replaying policy implies a lot of replaing attempts.
  4. During all the time where a tuple is trying to be replayed, A superior offset can't be committed in zookeeper. It's why I couldn't see this offset evoluate.
  5. After some time, the replaying queue saturate the heap and the process hangs.

In my case, I just had to tune the maxSpoutPending and MessageTimeOutSet to have a good flowing in the topology

StormTopology topology = builder.createTopology();
conf.setMaxSpoutPending(50);
conf.setMessageTimeoutSecs(120);

Franck

Upvotes: 1

Related Questions