Reputation: 511
I'm using Apache Storm 0.10.0-beta1 and started converting some topologies to Flux. I decided to start with a simple topology that reads from a Kafka queue and writes to a different Kafka queue. I get this error, which I am having a difficult time figuring out what is wrong. The topology yaml file follows the error.
Parsing file: /Users/frank/src/mapper/mapper.yaml
388 [main] INFO o.a.s.f.p.FluxParser - loading YAML from input stream...
391 [main] INFO o.a.s.f.p.FluxParser - Not performing property substitution.
391 [main] INFO o.a.s.f.p.FluxParser - Not performing environment variable substitution.
466 [main] INFO o.a.s.f.FluxBuilder - Detected DSL topology...
Exception in thread "main" java.lang.NullPointerException
at org.apache.storm.flux.FluxBuilder.canInvokeWithArgs(FluxBuilder.java:561)
at org.apache.storm.flux.FluxBuilder.findCompatibleConstructor(FluxBuilder.java:392)
at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:288)
at org.apache.storm.flux.FluxBuilder.buildSpout(FluxBuilder.java:361)
at org.apache.storm.flux.FluxBuilder.buildSpouts(FluxBuilder.java:349)
at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:84)
at org.apache.storm.flux.Flux.runCli(Flux.java:153)
at org.apache.storm.flux.Flux.main(Flux.java:98)
Topology yaml:
name: "mapper-topology"
config:
topology.workers: 1
topology.debug: true
kafka.broker.properties.metadata.broker.list: "localhost:9092"
kafka.broker.properties.request.required.acks: "1"
kafka.broker.properties.serializer.class: "kafka.serializer.StringEncoder"
# component definitions
components:
- id: "topicSelector"
className: "storm.kafka.bolt.selector.DefaultTopicSelector"
constructorArgs:
- "schemaq"
- id: "kafkaMapper"
className: "storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"
# spout definitions
spouts:
- id: "kafka-spout"
className: "storm.kafka.SpoutConfig"
parallelism: 1
constructorArgs:
- ref: "zkHosts"
- "mapperq"
- "/mapperq"
- "id-mapperq"
properties:
- name: "forceFromStart"
value: true
- name: "scheme"
ref: "stringMultiScheme"
# bolt definitions
bolts:
- id: "kafka-bolt"
className: "storm.kafka.bolt.KafkaBolt"
parallelism: 1
configMethods:
- name: "withTopicSelector"
args: [ref: "topicSelector"]
- name: "withTupleToKafkaMapper"
args: [ref: "kafkaMapper"]
# streams
streams:
- name: "kafka-spout --> kafka-bolt"
from: "kafka-spout"
to: "kafka-bolt"
grouping:
type: SHUFFLE
And here is the command:
storm jar /Users/frank/src/mapper/target/mapper-0.1.0-SNAPSHOT-standalone.jar org.apache.storm.flux.Flux --local mapper.yaml
Upvotes: 0
Views: 1467
Reputation: 1708
spout classname should be storm.kafka.KafkaSpout, not storm.kafka.SpoutConfig. You should define SpoutConfig to "components" section, and let spout refer this.
You can refer https://github.com/apache/storm/blob/master/external/flux/flux-examples/src/main/resources/kafka_spout.yaml to see how to setup KafkaSpout from flux.
Upvotes: 1