Reputation: 71
I'm new to Apache Storm and trying to get my feet wet.
Right now I simply want to log or print incoming Kafka messages which are received as byte arrays of ProtoBuf objects.
I need to do this within a Java Spring application.
I'm using Kafka 0.11.0.2
I'm using Storm 1.1.2 and have storm-core, storm-kafka, and storm-starters in my pom.
Main service class example
//annotations for spring
public class MyService{
public static void main(String[] args){
SpringApplication.run(MyService.class, args);
}
@PostConstruct
public void postConstruct() throws Exception {
SpoutConfig spoutConfig = new SpoutConfig(new ZKHosts("localhost:9092"), "topic", "/topic", "storm-spout");
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("storm-spout", kafkaSpout);
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("storm-spout");
Config config = new Config();
config.setDebug(true);
config.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka", config, builder.createTopology());
Thread.sleep(30000);
cluster.shutdown();
}
private class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector){
System.out.println("\n\n INPUT: "+input.toString()+"\n\n");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){}
}
}
I build a docker image from this with a Dockerfile that I know works with my environment for other spring apps and run it in a container it throws an exception and hangs.
The exception is java.io.NotSerializeableException
and I see Caused by java.lang.IllegalStateException: Bolt 'printer' contains a non-seriablizeable field of type my.package.MyService$$EnhancerBySpringCGLIB$$696afb49, which was instantiated prior to topology creation. my.package.MyService$$EnhancerBySpringCLGIB$$696afb49 should be instantiated within the prepare method of 'printer at the earliest.
I figure maybe it's because storm is trying and failing to serialize the incoming byte array but I'm not sure how to remedy that and I haven't seen a lot of people trying to do this.
I was using this as a reference. https://github.com/thehydroimpulse/storm-kafka-starter/blob/master/src/jvm/storm/starter/KafkaTopology.java
Upvotes: 2
Views: 422
Reputation: 3651
Either declare PrinterBolt in a new file, or make the class static. The problem you're running into is that PrinterBolt is a non-static inner class of MyService, which means it contains a reference to the outer MyService class. Since MyService isn't serializable, PrinterBolt isn't either. Storm requires bolts to be serializable.
Also unrelated to the error you're seeing, you might want to consider using storm-kafka-client over storm-kafka, since the latter is deprecated.
Upvotes: 2