frank
frank

Reputation: 511

Apache Storm Flux Simple KafkaSpout --> KafkaBolt NullPointerException

I'm using Apache Storm 0.10.0-beta1 and started converting some topologies to Flux. I decided to start with a simple topology that reads from a Kafka queue and writes to a different Kafka queue. I get this error, which I am having a difficult time figuring out what is wrong. The topology yaml file follows the error.

Parsing file: /Users/frank/src/mapper/mapper.yaml
388  [main] INFO  o.a.s.f.p.FluxParser - loading YAML from input stream...
391  [main] INFO  o.a.s.f.p.FluxParser - Not performing property substitution.
391  [main] INFO  o.a.s.f.p.FluxParser - Not performing environment variable substitution.
466  [main] INFO  o.a.s.f.FluxBuilder - Detected DSL topology...
Exception in thread "main" java.lang.NullPointerException
at org.apache.storm.flux.FluxBuilder.canInvokeWithArgs(FluxBuilder.java:561)
at org.apache.storm.flux.FluxBuilder.findCompatibleConstructor(FluxBuilder.java:392)
at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:288)
at org.apache.storm.flux.FluxBuilder.buildSpout(FluxBuilder.java:361)
at org.apache.storm.flux.FluxBuilder.buildSpouts(FluxBuilder.java:349)
at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:84)
at org.apache.storm.flux.Flux.runCli(Flux.java:153)
at org.apache.storm.flux.Flux.main(Flux.java:98)

Topology yaml:

name: "mapper-topology"
config:
  topology.workers: 1
  topology.debug: true
  kafka.broker.properties.metadata.broker.list: "localhost:9092"
  kafka.broker.properties.request.required.acks: "1"
  kafka.broker.properties.serializer.class: "kafka.serializer.StringEncoder"

# component definitions                                                                                                                                       
components:
  - id: "topicSelector"
    className: "storm.kafka.bolt.selector.DefaultTopicSelector"
    constructorArgs:
      - "schemaq"
  - id: "kafkaMapper"
    className: "storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"

# spout definitions                                                                                                                                           
spouts:
  - id: "kafka-spout"
    className: "storm.kafka.SpoutConfig"
    parallelism: 1
    constructorArgs:
      - ref: "zkHosts"
      - "mapperq"
      - "/mapperq"
      - "id-mapperq"
    properties:
      - name: "forceFromStart"
        value: true
      - name: "scheme"
        ref: "stringMultiScheme"

# bolt definitions                                                                                                                                            
bolts:
  - id: "kafka-bolt"
    className: "storm.kafka.bolt.KafkaBolt"
    parallelism: 1
configMethods:
  - name: "withTopicSelector"
    args: [ref: "topicSelector"]
  - name: "withTupleToKafkaMapper"
    args: [ref: "kafkaMapper"]

# streams                                                                                                                                                     
streams:
  - name: "kafka-spout --> kafka-bolt"                                                                                                      
    from: "kafka-spout"
    to: "kafka-bolt"
    grouping:
      type: SHUFFLE

And here is the command:

storm jar /Users/frank/src/mapper/target/mapper-0.1.0-SNAPSHOT-standalone.jar org.apache.storm.flux.Flux --local mapper.yaml

Upvotes: 0

Views: 1467

Answers (1)

Jungtaek Lim
Jungtaek Lim

Reputation: 1708

spout classname should be storm.kafka.KafkaSpout, not storm.kafka.SpoutConfig. You should define SpoutConfig to "components" section, and let spout refer this.

You can refer https://github.com/apache/storm/blob/master/external/flux/flux-examples/src/main/resources/kafka_spout.yaml to see how to setup KafkaSpout from flux.

Upvotes: 1

Related Questions