Reputation: 3099
I have defined a basic Storm topology with spout consumer from Kafka (producer is created in Kafka separate module). However, when I run the application I get this error:
java.lang.RuntimeException: org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
at org.apache.storm.utils.Utils$1.run(Utils.java:407) ~[storm-client-2.1.0.jar:2.1.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
How can I set up group id? I am running Storm locally with 2.1.0 version.
Here is the code for the topology:
val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val brokerHosts = new ZkHosts(bootstrapServers)
val topologyBuilder = new TopologyBuilder()
val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "tweets").build()
topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())
Upvotes: 1
Views: 444
Reputation: 401
You should use setProp(java.lang.String, java.lang.Object)
with ConsumerConfig.GROUP_ID_CONFIG
to add the consumer group id on the KafkaSpoutConfig
Upvotes: 2