Asier Gomez
Asier Gomez

Reputation: 6578

How to make a synchronous KafkaSpout on Storm

I am trying to make a Kafka consumer that syncrhonously consume messages from Kafka.

The actual problem I am having is that the queue of messages is being stored in Storm Spout.

What I am trying to do is making Storm be awaiting to Kafka ACK reply and only then let Storm consume next message.

I am using the Storm KafkaSpout:

/**
     * Creates a configured kafka spout.
     * @param topicName Topic where the kafka spout subscribes
     * @return An instance of configured KafkaSpout
     */
    public KafkaSpout getkafkaSpout(String topicName){
        return new KafkaSpout(this.getSpoutConfig(topicName));
    }

    /**
     * Create the necessary configuration to create a new kafka spout.
     * @param topicName Topic where the kafka spout subscribes
     * @return Spout configuration
     */
    public SpoutConfig getSpoutConfig(String topicName) {
        SpoutConfig spoutConfig=new SpoutConfig(this.getZkHosts(),topicName, "", String.join("-",topicName,RandomStringUtils.randomAlphanumeric(20)));
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
        return spoutConfig;
    }



builder.setSpout("kafkaPackedData", stormConfig.getkafkaSpout("topic_kafka"), 2);

I have update to Storm 2.0.0, I use storm-kafka-client. But if I configure the Storm queue to 50: setMaxSpoutPending(50); when I send many data to Kafka, it Storm stops consuming it.

I have configured Kafka consumer with the next config:

KafkaSpoutConfig spoutConf =  KafkaSpoutConfig.builder("stream1:9092", "kafkaToStormAlarms")
                    .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000") //Set session timeout
                    .setProp(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000") //Set request timeout
                    .setOffsetCommitPeriodMs(10000)    //Set automatic confirmation time (in ms)
                    .setFirstPollOffsetStrategy(LATEST)    //Set to pull the latest messages
                    .setRetry(kafkaSpoutRetryService)
                    .build();

When Storm consumes 50 messages same of the MaxSpoutPending configuration, it stops consuming more. Maybe the next bolt is not sending correctly the ACK? I use the next bolt after the KafkaConsumerSpout:

public class testBolt extends BaseBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("MQTTmessage"));
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector boc) {
        System.out.println("\n\n\n\nLLEGA BIENN AL SPLIT TEXT BOLT\n\n");
        System.out.println("TUPLE "+tuple);
        String text = tuple.getString(4);
        List<String> lines = Arrays.asList(text.split("\\r?\\n"));

        lines.forEach(line -> {
            boc.emit(new Values(line));
        });
    }
}

Upvotes: 0

Views: 254

Answers (1)

Stig Rohde D&#248;ssing
Stig Rohde D&#248;ssing

Reputation: 3651

Regarding throttling the spout: Yes, you can do this by setting the topology.max.spout.pending option in your topology configuration to 1. I wouldn't really recommend it, if you want to have good throughput, but I'll assume you've already considered carefully why you need the topology to behave this way.

Regarding the new spout: Is stream1:9092 the server Kafka is running on, and is kafkaToStormAlarms the topic you're sending to? If not, that's probably your problem. Otherwise, check the worker logs in storm/logs/workers-artifacts, it may tell you why the spout isn't emitting anything.

Finally yes, you should absolutely be using storm-kafka-client instead of storm-kafka, or you will not be able to upgrade to Storm 2.0.0, or the newest Kafka versions for that matter.

Upvotes: 1

Related Questions