Reputation: 368
I integrated Apache Storm 1.0.3 with Apache Kafka_2.11-0.10.1.0. Storm read one or two menssages from kafka topic correctlly but when the first bolt acked the tuple, this acked doesn't show in Storm UI. What is the problem?
Other Question: When Storm read ten or nineteen messages from kafka topic, in that point Storm UI show 20 acked for the Bolt, if read others group of 19 messages acked add in 20 more. I don´t understand why Storm UI show Spout and bolts's acked of 20 in 20. Anyone could me explain what is the logic of register of acked and fail in Storm Ui console?.
The configuration of my topology is:
final TopologyBuilder myTopology = new TopologyBuilder();
KafkaConfiguration kconfig = new KafkaConfiguration();
SpoutConfig spout = kconfig.getKafkaConfiguration( args[0], args[1], args[2], args[3]);
myTopology.setSpout("spoutMvClient", new KafkaSpout(spout), 5);
myTopology.setBolt("boltTransformToObject", new TransformBolt(),7).globalGrouping("spoutMvClient");
myTopology.setBolt("boltMVClient", new MvClientBolt(), 6).fieldsGrouping("boltTransformToObject",new Fields("objectTarget"));
Config conf = new Config();
conf.setMaxSpoutPending(5000);
try {
StormSubmitter.submitTopology( "topologyOne", conf, myTopology.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
}
My first TransformBolt Bolt is:
public void execute(Tuple input) {
try {
LOG.info(input.getString(0));
Transform transform = new Transform();
OpenTarget openTarget = transform.getObjetGenericFromFileXml(input.getString(0));
collector.emit(input, new Values(openTarget));
collector.ack(input);
} catch (Exception e) {
LOG.error(e.getMessage());
collector.fail(input);
}
}
Upvotes: 0
Views: 670
Reputation: 368
After investigated I understood how the storm acked theirs tuples and shows by Storm UI. There are a default configuration that allow show the acked number in storm UI:
This configuration only measures and shows 5% of the data flow. If we need show the number of tuples acknowledged or failed by specific spout or bolt, we must change this configuration to:
config.setStatsSampleRate(1.0d);
as like:
Config conf = new Config();
conf.setMaxSpoutPending(5000);
conf.setStatsSampleRate(1.0d);
try {
StormSubmitter.submitTopology( "topologyOne", conf, myTopology.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
}
Then the storm UI starts count and show correctly the number acked that occurred on the topology:
Finally, Here is another question almost like to mine: Storm latency caused by ack
Upvotes: 1