Reputation: 65
I have the following Java Spark Hive Example as can be found on the official apache/spark Github. I have spend a lot of time understanding how to run the example in my Hortonworks Hadoop Sandbox without success.
Currently, I am doing the following:
Setting the SparkSession to master local, changing spark.sql.warehouse.dir to hive.metastore.uris and set thrift://localhost:9083 (as I can see in the Hive confing in Ambari) as warehouseLocation.
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.master("local[*]")
.config("hive.metastore.uris", "thrift://localhost:9083")
.enableHiveSupport()
.getOrCreate();
Then I replace spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
with a path to hdfs where I have uploaded kv1.txt:
spark.sql("LOAD DATA LOCAL INPATH 'hdfs:///tmp/kv1.txt' INTO TABLE src");
The last step is to make the JAR with mvn package
on the pom.xml - it builds without errors and gives me original-spark-examples_2.11-2.3.0-SNAPSHOT.jar
I copy the assembly over to the Hadoop Sandbox scp -P 2222 ./target/original-spark-examples_2.11-2.3.0-SNAPSHOT.jar [email protected]:/root
and use spark-submit to run the code /usr/hdp/current/spark2-client/bin/spark-submit --class "JavaSparkHiveExample" --master local ./original-spark-examples_2.11-2.3.0-SNAPSHOT.jar
Which return the following error:
[root@sandbox-hdp ~]# /usr/hdp/current/spark2-client/bin/spark-submit --class "JavaSparkHiveExample" --master local ./original-spark-examples_2.11-2.3.0-SNAPSHOT.jar
java.lang.ClassNotFoundException: JavaSparkHiveExample
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:739)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[root@sandbox-hdp ~]#
..and here I am totally stuck, probably I am missing some steps to prepare the code to run and so on.
I would be very happy if I could get some help to get this code to run on my Hadoop Sandbox. I was able to run the JavaWordCount.java Spark example just fine but with this one I am totally stuck. Thanks :)
Complete JavaSparkHiveExample.java :
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.sql.hive;
// $example on:spark_hive$
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// $example off:spark_hive$
public class JavaSparkHiveExample {
// $example on:spark_hive$
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// $example off:spark_hive$
public static void main(String[] args) {
// $example on:spark_hive$
// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
(MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
Encoders.STRING());
stringsDS.show();
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// ...
// $example off:spark_hive$
spark.stop();
}
}
Upvotes: 1
Views: 2317
Reputation: 192023
The class name always needs to be fully qualified.
--class org.apache.spark.examples.sql.hive.JavaSparkHiveExample
spark.sql("LOAD DATA LOCAL INPATH 'hdfs:///tmp/kv1.txt' INTO TABLE src"); cannot read from the hdfs, how could I solve this
Few options
Upvotes: 1