It'sOK
It'sOK

Reputation: 65

Trouble running Java Spark Hive Example

I have the following Java Spark Hive Example as can be found on the official apache/spark Github. I have spend a lot of time understanding how to run the example in my Hortonworks Hadoop Sandbox without success.

Currently, I am doing the following:

Setting the SparkSession to master local, changing spark.sql.warehouse.dir to hive.metastore.uris and set thrift://localhost:9083 (as I can see in the Hive confing in Ambari) as warehouseLocation.

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
        .master("local[*]")
        .config("hive.metastore.uris", "thrift://localhost:9083")
  .enableHiveSupport()
  .getOrCreate();

Then I replace spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

with a path to hdfs where I have uploaded kv1.txt:

spark.sql("LOAD DATA LOCAL INPATH 'hdfs:///tmp/kv1.txt' INTO TABLE src");

The last step is to make the JAR with mvn package on the pom.xml - it builds without errors and gives me original-spark-examples_2.11-2.3.0-SNAPSHOT.jar

I copy the assembly over to the Hadoop Sandbox scp -P 2222 ./target/original-spark-examples_2.11-2.3.0-SNAPSHOT.jar [email protected]:/root

and use spark-submit to run the code /usr/hdp/current/spark2-client/bin/spark-submit --class "JavaSparkHiveExample" --master local ./original-spark-examples_2.11-2.3.0-SNAPSHOT.jar

Which return the following error:

[root@sandbox-hdp ~]# /usr/hdp/current/spark2-client/bin/spark-submit --class "JavaSparkHiveExample" --master local ./original-spark-examples_2.11-2.3.0-SNAPSHOT.jar
java.lang.ClassNotFoundException: JavaSparkHiveExample
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:739)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[root@sandbox-hdp ~]#

..and here I am totally stuck, probably I am missing some steps to prepare the code to run and so on.

I would be very happy if I could get some help to get this code to run on my Hadoop Sandbox. I was able to run the JavaWordCount.java Spark example just fine but with this one I am totally stuck. Thanks :)

Complete JavaSparkHiveExample.java :

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.spark.examples.sql.hive;

// $example on:spark_hive$
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// $example off:spark_hive$

public class JavaSparkHiveExample {

  // $example on:spark_hive$
  public static class Record implements Serializable {
    private int key;
    private String value;

    public int getKey() {
      return key;
    }

    public void setKey(int key) {
      this.key = key;
    }

    public String getValue() {
      return value;
    }

    public void setValue(String value) {
      this.value = value;
    }
  }
  // $example off:spark_hive$

  public static void main(String[] args) {
    // $example on:spark_hive$
    // warehouseLocation points to the default location for managed databases and tables
    String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
    SparkSession spark = SparkSession
      .builder()
      .appName("Java Spark Hive Example")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .enableHiveSupport()
      .getOrCreate();

    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
    spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

    // Queries are expressed in HiveQL
    spark.sql("SELECT * FROM src").show();
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...

    // Aggregation queries are also supported.
    spark.sql("SELECT COUNT(*) FROM src").show();
    // +--------+
    // |count(1)|
    // +--------+
    // |    500 |
    // +--------+

    // The results of SQL queries are themselves DataFrames and support all normal functions.
    Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

    // The items in DataFrames are of type Row, which lets you to access each column by ordinal.
    Dataset<String> stringsDS = sqlDF.map(
        (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
        Encoders.STRING());
    stringsDS.show();
    // +--------------------+
    // |               value|
    // +--------------------+
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // ...

    // You can also use DataFrames to create temporary views within a SparkSession.
    List<Record> records = new ArrayList<>();
    for (int key = 1; key < 100; key++) {
      Record record = new Record();
      record.setKey(key);
      record.setValue("val_" + key);
      records.add(record);
    }
    Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
    recordsDF.createOrReplaceTempView("records");

    // Queries can then join DataFrames data with data stored in Hive.
    spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
    // +---+------+---+------+
    // |key| value|key| value|
    // +---+------+---+------+
    // |  2| val_2|  2| val_2|
    // |  2| val_2|  2| val_2|
    // |  4| val_4|  4| val_4|
    // ...
    // $example off:spark_hive$

    spark.stop();
  }
}

Upvotes: 1

Views: 2317

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 192023

The class name always needs to be fully qualified.

--class org.apache.spark.examples.sql.hive.JavaSparkHiveExample

spark.sql("LOAD DATA LOCAL INPATH 'hdfs:///tmp/kv1.txt' INTO TABLE src"); cannot read from the hdfs, how could I solve this

Few options

  1. Remove LOCAL... that keyword means not to read from HDFS.
  2. Build an EXTERNAL TABLE over the existing file from Hive, query it in Spark
  3. Use Spark to read the file directly into a Dataset... Not clear if you need Hive, but if you do, you can use Spark to write the Dataset to a Hive table

Upvotes: 1

Related Questions