Reputation: 6006
I'm trying to load some data from HDFS into HBase as follows:
String dir = "/tmp/eloued";
Configuration config = HBaseConfiguration.create();
config.set(SequenceFileInputFormat.INPUT_DIR, dir);
// serialization
config.setStrings("io.serializations", config.get("io.serializations"),
MutationSerialization.class.getName(),
ResultSerialization.class.getName());
JavaPairRDD<ImmutableBytesWritable, Put> input = context.newAPIHadoopRDD(config, SequenceFileInputFormat.class, ImmutableBytesWritable.class, Put.class);
JobConf jobConfig = new JobConf(config, this.getClass());
jobConfig.setOutputFormat(org.apache.hadoop.hbase.mapred.TableOutputFormat.class);
jobConfig.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE, tableName);
input.saveAsHadoopDataset(jobConfig);
Inside the dir
directory I have two folders as follows:
drwxr-xr-x - hadoop hadoop 0 2017-05-26 06:40 /tmp/eloued/object
-rw-r--r-- 3 hadoop hadoop 0 2017-05-26 06:40 /tmp/eloued/object/_SUCCESS
-rw-r--r-- 3 hadoop hadoop 896 2017-05-26 06:40 /tmp/eloued/object/part-r-00000
drwxr-xr-x - hadoop hadoop 0 2017-05-26 06:40 /tmp/eloued/status
-rw-r--r-- 3 hadoop hadoop 0 2017-05-26 06:40 /tmp/eloued/status/_SUCCESS
-rw-r--r-- 3 hadoop hadoop 111 2017-05-26 06:40 /tmp/eloued/status/part-r-00000
Now when I call the code above I see the following exception:
java.io.FileNotFoundException: File does not exist: hdfs://tmp/eloued/object/data
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:67)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:385)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:115)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopDataset(JavaPairRDD.scala:836)
at com.ddn.cloud.replication.storage.SparkConsumerJob.run(SparkConsumerJob.java:46)
at dz.lab.HBaseSparkTest.testConsumerProcessObject(NSReplicationHBaseSparkTest.java:223)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Why the suffix data
is added to the path spark will try to read data from? The strange thing is when the dir
folder have only one subfolder then no exception is raised and the data is effectively stored in HBase.
Upvotes: 0
Views: 349
Reputation: 6006
I simply had to specify the different folders and separate them with ,
. So instead of:
config.set(SequenceFileInputFormat.INPUT_DIR, "/tmp/eloued");
I should have done:
config.set(SequenceFileInputFormat.INPUT_DIR, "/tmp/eloued/object,/tmp/eloued/status");
And it works fine.
Upvotes: 0