NaptownCSC
NaptownCSC

Reputation: 71

Reading Kafka messages with Apache Storm in a Java Spring application causing NotSerializeableException, why?

I'm new to Apache Storm and trying to get my feet wet.

Right now I simply want to log or print incoming Kafka messages which are received as byte arrays of ProtoBuf objects.

I need to do this within a Java Spring application.

I'm using Kafka 0.11.0.2

I'm using Storm 1.1.2 and have storm-core, storm-kafka, and storm-starters in my pom.

Main service class example

//annotations for spring
public class MyService{

    public static void main(String[] args){
        SpringApplication.run(MyService.class, args);
    }

    @PostConstruct
    public void postConstruct() throws Exception {
        SpoutConfig spoutConfig = new SpoutConfig(new ZKHosts("localhost:9092"), "topic", "/topic", "storm-spout");

        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("storm-spout", kafkaSpout);
        builder.setBolt("printer", new PrinterBolt())
            .shuffleGrouping("storm-spout");

        Config config = new Config();
        config.setDebug(true);
        config.setMaxTaskParallelism(3);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("kafka", config, builder.createTopology());

        Thread.sleep(30000);
        cluster.shutdown();
    }

    private class PrinterBolt extends BaseBasicBolt {

        @Override
        public void execute(Tuple input, BasicOutputCollector){
            System.out.println("\n\n INPUT: "+input.toString()+"\n\n");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer){}
    }

}

I build a docker image from this with a Dockerfile that I know works with my environment for other spring apps and run it in a container it throws an exception and hangs.

The exception is java.io.NotSerializeableException

and I see Caused by java.lang.IllegalStateException: Bolt 'printer' contains a non-seriablizeable field of type my.package.MyService$$EnhancerBySpringCGLIB$$696afb49, which was instantiated prior to topology creation. my.package.MyService$$EnhancerBySpringCLGIB$$696afb49 should be instantiated within the prepare method of 'printer at the earliest.

I figure maybe it's because storm is trying and failing to serialize the incoming byte array but I'm not sure how to remedy that and I haven't seen a lot of people trying to do this.

I was using this as a reference. https://github.com/thehydroimpulse/storm-kafka-starter/blob/master/src/jvm/storm/starter/KafkaTopology.java

Upvotes: 2

Views: 422

Answers (1)

Stig Rohde Døssing
Stig Rohde Døssing

Reputation: 3651

Either declare PrinterBolt in a new file, or make the class static. The problem you're running into is that PrinterBolt is a non-static inner class of MyService, which means it contains a reference to the outer MyService class. Since MyService isn't serializable, PrinterBolt isn't either. Storm requires bolts to be serializable.

Also unrelated to the error you're seeing, you might want to consider using storm-kafka-client over storm-kafka, since the latter is deprecated.

Upvotes: 2

Related Questions