Reputation: 6578
I am trying to make a Kafka consumer that syncrhonously consume messages from Kafka.
The actual problem I am having is that the queue of messages is being stored in Storm Spout.
What I am trying to do is making Storm be awaiting to Kafka ACK reply and only then let Storm consume next message.
I am using the Storm KafkaSpout:
/**
* Creates a configured kafka spout.
* @param topicName Topic where the kafka spout subscribes
* @return An instance of configured KafkaSpout
*/
public KafkaSpout getkafkaSpout(String topicName){
return new KafkaSpout(this.getSpoutConfig(topicName));
}
/**
* Create the necessary configuration to create a new kafka spout.
* @param topicName Topic where the kafka spout subscribes
* @return Spout configuration
*/
public SpoutConfig getSpoutConfig(String topicName) {
SpoutConfig spoutConfig=new SpoutConfig(this.getZkHosts(),topicName, "", String.join("-",topicName,RandomStringUtils.randomAlphanumeric(20)));
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
return spoutConfig;
}
builder.setSpout("kafkaPackedData", stormConfig.getkafkaSpout("topic_kafka"), 2);
I have update to Storm 2.0.0, I use storm-kafka-client. But if I configure the
Storm queue to 50: setMaxSpoutPending(50);
when I send many data to Kafka, it Storm stops consuming it.
I have configured Kafka consumer with the next config:
KafkaSpoutConfig spoutConf = KafkaSpoutConfig.builder("stream1:9092", "kafkaToStormAlarms")
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000") //Set session timeout
.setProp(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000") //Set request timeout
.setOffsetCommitPeriodMs(10000) //Set automatic confirmation time (in ms)
.setFirstPollOffsetStrategy(LATEST) //Set to pull the latest messages
.setRetry(kafkaSpoutRetryService)
.build();
When Storm consumes 50 messages same of the MaxSpoutPending configuration, it stops consuming more. Maybe the next bolt is not sending correctly the ACK? I use the next bolt after the KafkaConsumerSpout:
public class testBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("MQTTmessage"));
}
@Override
public void execute(Tuple tuple, BasicOutputCollector boc) {
System.out.println("\n\n\n\nLLEGA BIENN AL SPLIT TEXT BOLT\n\n");
System.out.println("TUPLE "+tuple);
String text = tuple.getString(4);
List<String> lines = Arrays.asList(text.split("\\r?\\n"));
lines.forEach(line -> {
boc.emit(new Values(line));
});
}
}
Upvotes: 0
Views: 254
Reputation: 3651
Regarding throttling the spout: Yes, you can do this by setting the topology.max.spout.pending
option in your topology configuration to 1
. I wouldn't really recommend it, if you want to have good throughput, but I'll assume you've already considered carefully why you need the topology to behave this way.
Regarding the new spout: Is stream1:9092
the server Kafka is running on, and is kafkaToStormAlarms
the topic you're sending to? If not, that's probably your problem. Otherwise, check the worker logs in storm/logs/workers-artifacts
, it may tell you why the spout isn't emitting anything.
Finally yes, you should absolutely be using storm-kafka-client
instead of storm-kafka
, or you will not be able to upgrade to Storm 2.0.0, or the newest Kafka versions for that matter.
Upvotes: 1