Reputation: 35
I am currently learning how to use apache storm and apache kafka by following courses provided by OpenClassRoom and my question is very similar to the ones we can find here and here.I have tried to solve my problem with the information provided on these posts but nothing changes, I still have the same error.
I can compile the code on eclipse
and can use the command mvn package
without any issue. I put everything I've found that reffered to kafka
and storm
in the pom.xml
Any advice would help me a lot.
You can find bellow the pom
and the code.
The pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="" xmlns:xsi=""
<!-- FIXME change it to the project's website -->
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<!-- clean lifecycle, see -->
<!-- default lifecycle, jar packaging: see -->
<!-- site lifecycle, see -->
and the code :
package velib;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.tuple.Fields;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.kafka.clients.consumer.ConsumerConfig;
public class App
public static void main( String[] args )throws Exception, AlreadyAliveException, InvalidTopologyException, AuthorizationException
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig.Builder<String, String> spoutConfigBuilder = KafkaSpoutConfig.builder("localhost:9092", "velib-stations");
Properties prop = new Properties();
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "city-stats");
KafkaSpoutConfig<String, String> spoutConfig =;
builder.setSpout("stations", new KafkaSpout<String, String>(spoutConfig));
builder.setBolt("station-parsing", new StationParsingBolt())
builder.setBolt("city-stats", new CityStatsBolt().withTumblingWindow(BaseWindowedBolt.Duration.of(1000*60*5)))
.fieldsGrouping("station-parsing", new Fields("city"));
builder.setBolt("save-results", new SaveResultsBolt())
.fieldsGrouping("city-stats", new Fields("city"));
StormTopology topology = builder.createTopology();
Config config = new Config();
String topologyName = "velib";
if(args.length > 0 && args[0].equals("remote")) {
StormSubmitter.submitTopology(topologyName, config, topology);
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, config, topology);
Upvotes: 1
Views: 421
Reputation: 191681
Remove old version of storm-kafka
, and Kafka server dependency kafka_2.13
since you only need storm-kafka-client
Working Maven packaging only means your code compiled. You need to use assembly plugin (preferred) or shade plugins (as answered in linked question) to include Storm Kafka client libraries as part of your JAR. You might also want to put <scope>provided</scope>
on the storm-core dependency.
$ jar tf target/untitled-1.0-SNAPSHOT-jar-with-dependencies.jar | grep KafkaSpoutConfig
Upvotes: 2