Reputation: 371
So I have set up a storm spout coming from kafka and a bolt writing to the HDFS. This all works fine. I now want to add a new bolt which write to Hbase. For some reason, my application is not picking up the hbase configuration stuff and I get the following error:
java.lang.IllegalArgumentException: HBase configuration not found using key 'null'
at org.apache.storm.hbase.bolt.AbstractHBaseBolt.prepare(AbstractHBaseBolt.java:58) ~[storm-hbase-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.daemon.executor$fn__5697$fn__5710.invoke(executor.clj:732) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:463) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
2015-04-16 02:05:44 b.s.d.executor [ERROR]
java.lang.IllegalArgumentException: HBase configuration not found using key 'null'
at org.apache.storm.hbase.bolt.AbstractHBaseBolt.prepare(AbstractHBaseBolt.java:58) ~[storm-hbase-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.daemon.executor$fn__5697$fn__5710.invoke(executor.clj:732) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:463) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
2015-04-16 02:05:44 o.a.h.u.NativeCodeLoader [WARN] Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2015-04-16 02:05:44 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:322) [storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__6109$fn__6110.invoke(worker.clj:495) [storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.daemon.executor$mk_executor_data$fn__5530$fn__5531.invoke(executor.clj:245) [storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:475) [storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
When I look at the code it shows the following block where the error is occuring:
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
this.collector = collector;
final Configuration hbConfig = HBaseConfiguration.create();
Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);
if(conf == null) {
throw new IllegalArgumentException("HBase configuration not found using key '" + this.configKey + "'");
}
It looks like configKey isn't getting set anywhere so I tried to set it ion the HBaseBolt method as below:
SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("CustomerId")
.withColumnFields(new Fields("CustomerId"))
.withCounterFields(new Fields("Count"))
.withColumnFamily("cf1");
HBaseBolt hbase = new HBaseBolt("Customer", mapper).withConfigKey("/etc/hbase/conf/hbase-site.xml");
builder.setBolt("HBASE_BOLT", hbase, 1)
.fieldsGrouping("stormspout", new Fields("CustomerId"));
Didn't seem to do anything though as I am still getting the same error.... Anyone have any suggestions?! It feels like its just not picking up my hbase-site.xml file but I'm not sure why not...
Upvotes: 1
Views: 1627
Reputation: 8937
You should set the configuration section name for your HBase settings in your configuration of topology:
Config cfg = new Config();
Map<String, String> HBConfig = Maps.newHashMap();
HBConfig.put(somekey,somevalue);
cfg.put("HBCONFIG",HBConfig);
StormSubmitter.submitTopology(TOPOLOGY_NAME, cfg, builder.createTopology());
Then in your HBase bolt configuration set this key as a bolt configuration key:
HBaseBolt bolt = new HBaseBolt("table_name", mapper).withConfigKey("HBCONFIG");
Upvotes: 0
Reputation: 371
I actually ended up writing my own hbasebolt that implements IRichBolt. I then did an override on the prepare method and built the following configuration which seemed to solve my problem :-)
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "sandbox.hortonworks.com");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.session.timeout", "1200000");
conf.set("hbase.zookeeper.property.tickTime", "6000");
conf.set("zookeeper.znode.parent", "/hbase-unsecure");
conf.set("hbase.coprocessor.region.classes", "com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor");
conf.set("hbase.coprocessor.master.classes", "com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor");
admin = new HBaseAdmin(conf);
table = new HTable(conf, _tableName);
Upvotes: 0
Reputation: 11
After lots of work, I finally got this to work!!
In your topology createConfig method, add
Map<String, String> HBConfig = Maps.newHashMap();
HBConfig.put("hbase.rootdir","hdfs://<IP Address>:8020/hbase");
conf.put("HBCONFIG",HBConfig);
When you instantiate HBaseBolt, do so using .withConfigKey("HBCONFIG")
Upvotes: 1