Reputation: 349
I'm testing simple topology to check kafka spout performance. It contains kafka spout and Bolt that acknowledge each tuple. Bolt execute method:
public void execute(Tuple input) {
collector.ack(input);
}
Topology looks like this:
protected void configureTopology(TopologyBuilder topologyBuilder) {
configureKafkaCDRSpout(topologyBuilder);
configureKafkaSpoutBandwidthTesterBolt(topologyBuilder);
}
private void configureKafkaCDRSpout(TopologyBuilder builder) {
KafkaSpout kafkaSpout = new KafkaSpout(createKafkaCDRSpoutConfig());
int spoutCount = Integer.valueOf(topologyConfig.getProperty("kafka.cboss.cdr.spout.thread.count"));
builder.setSpout(KAFKA_CDR_SPOUT_ID, kafkaSpout, spoutCount)
.setNumTasks(Integer.valueOf(topologyConfig.getProperty(KAFKA_CDR_SPOUT_NUM_TASKS)));
}
private SpoutConfig createKafkaCDRSpoutConfig() {
BrokerHosts hosts = new ZkHosts(topologyConfig.getProperty("kafka.zookeeper.broker.host"));
String topic = topologyConfig.getProperty("kafka.cboss.cdr.topic");
String zkRoot = topologyConfig.getProperty("kafka.cboss.cdr.zkRoot");
String consumerGroupId = topologyConfig.getProperty("kafka.cboss.cdr.consumerId");
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new CbossCdrScheme());
kafkaSpoutConfig.ignoreZkOffsets = true;
kafkaSpoutConfig.fetchSizeBytes = Integer.valueOf(topologyConfig.getProperty("kafka.fetchSizeBytes"));
kafkaSpoutConfig.bufferSizeBytes = Integer.valueOf(topologyConfig.getProperty("kafka.bufferSizeBytes"));
return kafkaSpoutConfig;
}
public void configureKafkaSpoutBandwidthTesterBolt(TopologyBuilder topologyBuilder) {
SimpleAckerBolt b = new SimpleAckerBolt();
topologyBuilder.setBolt(SPOUT_BANDWIDTH_TESTER_BOLT_ID, b, Integer.valueOf(topologyConfig.getProperty(CFG_SIMPLE_ACKER_BOLT_PARALLELISM)))
.setNumTasks(Integer.valueOf(topologyConfig.getProperty(SPOUT_BANDWIDTH_TESTER_BOLT_NUM_TASKS)))
.localOrShuffleGrouping(KAFKA_CDR_SPOUT_ID);
}
Other topology settings:
topology.max.spout.pending=250
topology.executor.receive.buffer.size=1024
topology.executor.send.buffer.size=1024
topology.receiver.buffer.size=8
topology.transfer.buffer.size=1024
topology.acker.executors=1
I'm launching my topology with 1 worker 1 Kafka Spout and 1 Simple Acker Bolt.
Thats what i get in storm UI:
Okey I got 1.5kk tuples in 10min. Bolts capasity is around 0,5. So my logic is simple: If i double spout and bolts parallelism hint - I will get double perfomance. Next test was with 1 worker 2 Kafka Spout, 2 Simple Acker Bolt and topology.acker.executors=2. Here is results:
So, I get worse perfomance with increased parallelizm hint. Why could it happend? How can I increse tuples per second processing? Actualy any test with spout parallelism hint greater than 2 shows worse result than 1 spout executor.
I've already checked:
1) It's not kafka fault. Topic have 20 partitions on 2 brokers. Topology on 4 workers scales and get x4 perfomance.
2) It's not server fault. Server has 40 cores and 32Gb RAM. While runing topology it consumes around 1/8 CPU and almost none RAM.
3) Changing topology.max.spout.pending paramter doesn't help.
4) Increasing Bolt or Acker parallelism hint even more doesn't help.
Upvotes: 1
Views: 1081
Reputation: 3172
So it seems as though you're hitting a limit with your performance of one worker. You're just giving the one worker to much to do and it can't handle it all.
At this point if you want to further increase the performance of your system you have two options.
If you do not want to add more workers, then what is left to you is to configure your one worker. You should then investigate the configuration for your one worker to give it more memory, more cpu, etc. You should probably look over the default configuration options for Storm and see whether tweaking some configuration values gives you better performance. Some of the configs that seem more likely to help than others:
worker.heap.memory.mb:
worker.childopts:
supervisor.childopts:
supervisor.memory.capacity.mb:
supervisor.cpu.capacity:
Upvotes: 0