Nitin
Nitin

Reputation: 165

Field Grouping for a Kafka Spout

Can field grouping be done on tuples emitted by a kafka spout? If yes, then how does Storm gets to know the fields in a Kafka record?

Upvotes: 3

Views: 1230

Answers (3)

shane
shane

Reputation: 509

TL:DR The default implementation of KafkaSpout declares following output fields in declareOutputFields:

new Fields("topic", "partition", "offset", "key", "value");

So in building topology code directly do:

topologyBuilder.setSpout(spoutName, mySpout, parallelismHintSpout);
topologyBuilder.setBolt(boltName, myBolt, parallelismHintBolt).fieldsGrouping(spoutName, new Fields("key"));

Details: A little looking into code tells that:

In Kafka Spout, declareOutputFields is implemented in following way:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
    for (String stream : translator.streams()) {
        declarer.declareStream(stream, translator.getFieldsFor(stream));
    }
}

It gets fields from RecordTranslator interface and its instance is fetched from kafkaSpoutConfig i.e. KafkaSpoutConfig<K, V>. KafkaSpoutConfig<K, V> extends from CommonKafkaSpoutConfig (this is slightly different in 1.1.1 version though). The builder of this returns DefaultRecordTranslator. If you check the Fields in this class implementation, you will find:

public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");

So we can use Fields("key") directly in fields grouping in topology code:

topologyBuilder.setBolt(boltName, myBolt, parallelismHintBolt).fieldsGrouping(spoutName, new Fields("key"));

Upvotes: 0

gasparms
gasparms

Reputation: 3354

Kafka Spout declared its output fields like any other component. My explanation is based on current implementation of KafkaSpout.

In KafkaSpout.java class we see declareOutputFields method that call getOutputFields() method of KafkaConfig Scheme.

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(_spoutConfig.scheme.getOutputFields());
}

By default, KafkaConfig uses RawMultiScheme that implements this method in this way.

  @Override
  public Fields getOutputFields() {
    return new Fields("bytes");
  }

So what does it mean?, if you declared bolt which reads tuples from KafkaSpout with fieldGrouping you know that every tuple that contains equals field "bytes" is going to be executed by the same task. If you want to emit any field, you should implement new scheme for your needs.

Upvotes: 1

Chiron
Chiron

Reputation: 20245

Field grouping (and grouping in general) in Storm is for bolts, not for spouts. This is done via InputDeclarer class.
When you call setBolt() on TopologyBuilder, InputDeclarer is returned.

Upvotes: 2

Related Questions