Reputation: 4882
I am using Kafka storm, kafka sends/emits json string to storm, in the storm, I want to distribute the load to a couple of workers based on the key/field in the json. How to do that? In my case, it is groupid field in json string.
For example, I have json like that:
{groupid: 1234, userid: 145, comments:"I want to distribute all this group 1234 to one worker", size:50,type:"group json"}
{groupid: 1235, userid: 134, comments:"I want to distribute all this group 1234 to another worker", size:90,type:"group json"}
{groupid: 1234, userid: 158, comments:"I want to be sent to same worker as group 1234", size:50,type:"group json"}
===Storm 0.9.4. is used=====
My source codes is as follows:
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class KafkaBoltMain {
private static final String SPOUTNAME="TopicSpout";
private static final String ANALYSISBOLT = "AnalysisWorker";
private static final String CLIENTID = "Storm";
private static final String TOPOLOGYNAME = "LocalTopology";
private static class AppAnalysisBolt extends BaseRichBolt {
private static final long serialVersionUID = -6885792881303198646L;
private OutputCollector _collector;
private long groupid=-1L;
private String log="test";
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
List<Object> objs = tuple.getValues();
int i=0;
for(Object obj:objs){
System.out.println(""+i+"th object's value is:"+obj.toString());
i++;
}
// _collector.emit(new Values(groupid,log));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("groupid","log"));
}
}
public static void main(String[] args){
String zookeepers = null;
String topicName = null;
if(args.length == 2 ){
zookeepers = args[0];
topicName = args[1];
}else if(args.length == 1 && args[0].equalsIgnoreCase("help")){
System.out.println("xxxx");
System.exit(0);
}
else{
System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
System.out.println("xxxx");
System.exit(-1);
}
SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
topicName,
"",// zookeeper root path for offset storing
CLIENTID);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUTNAME, kafkaSpout, 1);
builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt(),2)
.fieldsGrouping(SPOUTNAME,new Fields("groupid"));
//Configuration
Config conf = new Config();
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGYNAME, conf, builder.createTopology());
}
}
But when I submit topology, it gives following error:
12794 [main] WARN backtype.storm.daemon.nimbus - Topology submission exception. (topology name='LocalTopology') #<InvalidTopologyException InvalidTopologyException(msg:Component:
[AnalysisWorker] subscribes from stream: [default] of component [TopicSpout] with non-existent fields: #{"groupid"})>
12800 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null
Why there is non-existent fileds warning message? Any hints?
Upvotes: 3
Views: 5066
Reputation: 16392
You need to pull the json attribute out of the json object and pass the two values (json object and String groupId) along as a two-value tuple. When you declare the stream as part of the topology specification logic, you'd give the second field the name "groupId" and things should work fine. If you don't want to modify the Kafka spout, you'd need to have an intermediary bolt who's sole purpose is to split the groupId out of the json object. The intermediate bolt could also use a directed stream (emitDirect() method) basing the destination on the groupId in the json object.
This is one reason why I don't reuse the Kafka spout - there's often something else I want to do other than just blindly write the data out onto a stream, but that's neither here nor there.
Upvotes: 2