Reputation: 49
I'm new to Storm/Kafka. I've been able to configure a basic working prototype with :
I'm able to produce message and to consume them from a storm topology.
I've a question regarding the kafka offsets persisting.
Initially I could'nt find the group used in the java client when using kafka utilities.
After some search I've read that storm-kafka stores this offset in zookeeper. If I have for configuration in storm-kafka :
Then I can retrieve the offset with the zookeeper zkCli.sh script.
get /my_root/my_group/partition_0
==> "topology":{},"offset":3148,..., "topic":"rawdatas"
My problem is I don't understand how and how often this offset is updated. In the storm bolt I'm sure to ack every tuple processed.
When the topology is started, and the tuples starting to be processed, I can see a little jump of the offset in Zookeeper (eg : some dozen) Then the offset don't move for long times.
Some time, I can see a larger jump (eg some thousand) but it seems random. Regarding the default kafka configuration, I understand that the offset should be updated every 2 seconds
// setting for how often to save the current kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;
Do I miss something ?
Franck
Upvotes: 0
Views: 1194
Reputation: 49
After some diving in kafka-spout code I understand better my problem.
This post helped me too : http://www.developer.com/open/addressing-internal-apache-storm-buffers-overflowing.html
As usual, all is matter of configuration
In my sample topology, I have a kafka-spout which emit tuples to a simple monothreaded bolt which do "intense computation" that we can emulate with a simple Thread.sleep(1000)
Storm has a limit of how many tuples emitted can be alive in the topology (default 512) There is also a timeout between the spout emitting and the final ack for the tuple, which is 30 second par default.
What matters I think is :
In my case, I just had to tune the maxSpoutPending and MessageTimeOutSet to have a good flowing in the topology
StormTopology topology = builder.createTopology();
conf.setMaxSpoutPending(50);
conf.setMessageTimeoutSecs(120);
Franck
Upvotes: 1