Reputation: 787
I want to instantiate a LocalCluster() and prevent it from runing its own embedded zookeeper, and use mine instead.
Regarding to this issue : "https://issues.apache.org/jira/browse/STORM-213" it is resolved for version 0.9.3.
can I have a sample code for this?
PS: I am integrate testing my storm topology, and I use kafka and zookeeper as input to storm. when i don't specify the zookeeper info to localcluster, i get this exception at line "LocalCluster localCluster = new LocalCluster()":
2016-06-08 12:16:56,785 WARN [Thread-30] jmx.MBeanRegistry (MBeanRegistry.java:register(100)) - Failed to register MBean StandaloneServer_port-1
2016-06-08 12:16:56,785 WARN [Thread-30] server.ZooKeeperServer (ZooKeeperServer.java:registerJMX(387)) - Failed to register with JMX
javax.management.InstanceAlreadyExistsException: org.apache.ZooKeeperService:name0=StandaloneServer_port-1
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.zookeeper.jmx.MBeanRegistry.register(MBeanRegistry.java:96)
at org.apache.zookeeper.server.ZooKeeperServer.registerJMX(ZooKeeperServer.java:377)
at org.apache.zookeeper.server.ZooKeeperServer.startup(ZooKeeperServer.java:410)
at org.apache.zookeeper.server.NIOServerCnxnFactory.startup(NIOServerCnxnFactory.java:123)
and when I specify "storm.zookeeper.servers" and "storm.zookeeper.port" to local cluster, I get below exception at "localCluster.submitTopology()" line:
EndOfStreamException: Unable to read additional data from client sessionid 0x1552f0890b70000, likely client has closed socket
at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:26)
at org.apache.storm.testing$submit_local_topology.invoke(testing.clj:301)
at org.apache.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:49)
at org.apache.storm.LocalCluster.submitTopology(Unknown Source)
Upvotes: 1
Views: 782
Reputation: 19
I was able to use Kafka as the input of an Storm topology by using the configuration provided at this link http://storm.apache.org/releases/1.0.2/storm-kafka.html
I created a specific java class for the storm config
public class StormConfig {
private String zooKeeperConnect;
public StormConfig() {
}
public KafkaSpout getkafkaSpout(String topic){
return new KafkaSpout(this.getSpoutConfig(topic));
}
public SpoutConfig getSpoutConfig(String topic) {
SpoutConfig spoutConfig=new SpoutConfig(this.getZkHosts(), topic, "", topic);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime=kafka.api.OffsetRequest.EarliestTime();
return spoutConfig;
}
public ZkHosts getZkHosts() {
return new ZkHosts(getZooKeeperConnect());
}
public String getZooKeeperConnect() {
return zooKeeperConnect;
}
public void setZooKeeperConnect(String zooKeeperConnect) {
this.zooKeeperConnect = zooKeeperConnect;
}
}
Then I used the method from this class when creating the initial spout of the topology:
builder.setSpout("kafkaSpoutName", stormConfig.getkafkaSpout("topicName"))
.setNumTasks(Constants.SPOUT_NUM_TASKS);
Upvotes: 0
Reputation: 1774
You can use the overload LocalCluster
constructor.
LocalCluster cluster = new LocalCluster("localhost", 2181L);
Upvotes: 1