Reputation: 165
Can field grouping be done on tuples emitted by a kafka spout? If yes, then how does Storm gets to know the fields in a Kafka record?
Upvotes: 3
Views: 1230
Reputation: 509
TL:DR
The default implementation of KafkaSpout declares following output fields in declareOutputFields
:
new Fields("topic", "partition", "offset", "key", "value");
So in building topology code directly do:
topologyBuilder.setSpout(spoutName, mySpout, parallelismHintSpout);
topologyBuilder.setBolt(boltName, myBolt, parallelismHintBolt).fieldsGrouping(spoutName, new Fields("key"));
Details: A little looking into code tells that:
In Kafka Spout, declareOutputFields
is implemented in following way:
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
for (String stream : translator.streams()) {
declarer.declareStream(stream, translator.getFieldsFor(stream));
}
}
It gets fields from RecordTranslator
interface and its instance is fetched from kafkaSpoutConfig
i.e. KafkaSpoutConfig<K, V>
. KafkaSpoutConfig<K, V>
extends from CommonKafkaSpoutConfig
(this is slightly different in 1.1.1 version though). The builder of this returns DefaultRecordTranslator
. If you check the Fields in this class implementation, you will find:
public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
So we can use Fields("key")
directly in fields grouping in topology code:
topologyBuilder.setBolt(boltName, myBolt, parallelismHintBolt).fieldsGrouping(spoutName, new Fields("key"));
Upvotes: 0
Reputation: 3354
Kafka Spout declared its output fields like any other component. My explanation is based on current implementation of KafkaSpout.
In KafkaSpout.java class we see declareOutputFields method that call getOutputFields() method of KafkaConfig Scheme.
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(_spoutConfig.scheme.getOutputFields());
}
By default, KafkaConfig uses RawMultiScheme that implements this method in this way.
@Override
public Fields getOutputFields() {
return new Fields("bytes");
}
So what does it mean?, if you declared bolt which reads tuples from KafkaSpout with fieldGrouping you know that every tuple that contains equals field "bytes" is going to be executed by the same task. If you want to emit any field, you should implement new scheme for your needs.
Upvotes: 1
Reputation: 20245
Field grouping (and grouping in general) in Storm is for bolts, not for spouts. This is done via InputDeclarer
class.
When you call setBolt()
on TopologyBuilder
, InputDeclarer
is returned.
Upvotes: 2