Reputation: 1647
I am new to Spark and Hive and my goal is to load a delimited(lets say csv) to Hive table. After a bit of reading I found out that the path to load the data into Hive is csv->dataframe->Hive
.(Please correct me if I am wrong).
CSV:
1,Alex,70000,Columbus
2,Ryan,80000,New York
3,Johny,90000,Banglore
4,Cook, 65000,Glasgow
5,Starc, 70000,Aus
I read the csv file be using below command:
val csv =sc.textFile("employee_data.txt").map(line => line.split(",").map(elem => elem.trim))
csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[29] at map at <console>:39
Now I am trying to convert this RDD to Dataframe and using below code:
scala> val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF()
df: org.apache.spark.sql.DataFrame = [eid: string, name: string, salary: string, destination: string]
employee is a case class and I am using it as a schema definition.
case class employee(eid: String, name: String, salary: String, destination: String)
However when I do df.show
I am getting below error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10.0 (TID 22, user.hostname): scala.MatchError: [Ljava.lang.String;@88ba3cb (of class [Ljava.lang.String;)
I was expecting a dataframe as a output. I know why I might be getting this error because the values in RDD are stored in Ljava.lang.String;@88ba3cb
format and I need to use mkString
to get the actual values but I am not able to find how to do it. I appreciate your time.
Upvotes: 2
Views: 12811
Reputation: 16076
Use map on array elements, not on array:
val csv = sc.textFile("employee_data.txt")
.map(line => line
.split(",")
.map(e => e.map(_.trim))
)
val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF()
But, why you are reading CSV and then converting RDD to DF? Spark 1.5 already can read CSV via spark-csv
package:
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ";")
.load("employee_data.txt")
Upvotes: 2
Reputation: 5572
If you fix your case class then it should work:
scala> case class employee(eid: String, name: String, salary: String, destination: String)
defined class employee
scala> val txtRDD = sc.textFile("data.txt").map(line => line.split(",").map(_.trim))
txtRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[30] at map at <console>:24
scala> txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3)}.toDF.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
| 1| Alex| 70000| Columbus|
| 2| Ryan| 80000| New York|
| 3|Johny| 90000| Banglore|
| 4| Cook| 65000| Glasgow|
| 5|Starc| 70000| Aus|
+---+-----+------+-----------+
Otherwise you could convert the String
to an Int
:
scala> case class employee(eid: Int, name: String, salary: String, destination: String)
defined class employee
scala> val df = txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0.toInt, s1, s2, s3)}.toDF
df: org.apache.spark.sql.DataFrame = [eid: int, name: string ... 2 more fields]
scala> df.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
| 1| Alex| 70000| Columbus|
| 2| Ryan| 80000| New York|
| 3|Johny| 90000| Banglore|
| 4| Cook| 65000| Glasgow|
| 5|Starc| 70000| Aus|
+---+-----+------+-----------+
However the best solution would be to use spark-csv
(which would treat the salary as an Int
as well).
Also note that the error was thrown when you ran df.show
because everything was being lazily evaluated up until that point. df.show
is an action which will cause all of the queued transformations to execute (see this article for more).
Upvotes: 3
Reputation: 18022
As you said in your comment, your case class employee, which should be named Employee
, receives an Int
as first argument of its constructor, but you are passing a String
. Thus, you should convert it to an Int
before instantiating or modify your case defining eid
as a String
.
Upvotes: 1