Explorer
Explorer

Reputation: 1647

RDD[Array[String]] to Dataframe

I am new to Spark and Hive and my goal is to load a delimited(lets say csv) to Hive table. After a bit of reading I found out that the path to load the data into Hive is csv->dataframe->Hive.(Please correct me if I am wrong).

CSV:
1,Alex,70000,Columbus
2,Ryan,80000,New York
3,Johny,90000,Banglore
4,Cook, 65000,Glasgow
5,Starc, 70000,Aus

I read the csv file be using below command:

val csv =sc.textFile("employee_data.txt").map(line => line.split(",").map(elem => elem.trim))
csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[29] at map at <console>:39

Now I am trying to convert this RDD to Dataframe and using below code:

scala> val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF()
df: org.apache.spark.sql.DataFrame = [eid: string, name: string, salary: string, destination: string]

employee is a case class and I am using it as a schema definition.

case class employee(eid: String, name: String, salary: String, destination: String)

However when I do df.show I am getting below error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10.0 (TID 22, user.hostname): scala.MatchError: [Ljava.lang.String;@88ba3cb (of class [Ljava.lang.String;)

I was expecting a dataframe as a output. I know why I might be getting this error because the values in RDD are stored in Ljava.lang.String;@88ba3cb format and I need to use mkString to get the actual values but I am not able to find how to do it. I appreciate your time.

Upvotes: 2

Views: 12811

Answers (3)

T. Gawęda
T. Gawęda

Reputation: 16076

Use map on array elements, not on array:

val csv = sc.textFile("employee_data.txt")
    .map(line => line
                     .split(",")
                     .map(e => e.map(_.trim))
     )
val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF()

But, why you are reading CSV and then converting RDD to DF? Spark 1.5 already can read CSV via spark-csv package:

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("inferSchema", "true") 
    .option("delimiter", ";") 
    .load("employee_data.txt")

Upvotes: 2

evan.oman
evan.oman

Reputation: 5572

If you fix your case class then it should work:

scala> case class employee(eid: String, name: String, salary: String, destination: String)
defined class employee

scala> val txtRDD = sc.textFile("data.txt").map(line => line.split(",").map(_.trim))
txtRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[30] at map at <console>:24

scala> txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3)}.toDF.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
|  1| Alex| 70000|   Columbus|
|  2| Ryan| 80000|   New York|
|  3|Johny| 90000|   Banglore|
|  4| Cook| 65000|    Glasgow|
|  5|Starc| 70000|        Aus|
+---+-----+------+-----------+

Otherwise you could convert the String to an Int:

scala> case class employee(eid: Int, name: String, salary: String, destination: String)
defined class employee

scala> val df = txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0.toInt, s1, s2, s3)}.toDF
df: org.apache.spark.sql.DataFrame = [eid: int, name: string ... 2 more fields]

scala> df.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
|  1| Alex| 70000|   Columbus|
|  2| Ryan| 80000|   New York|
|  3|Johny| 90000|   Banglore|
|  4| Cook| 65000|    Glasgow|
|  5|Starc| 70000|        Aus|
+---+-----+------+-----------+

However the best solution would be to use spark-csv (which would treat the salary as an Int as well).

Also note that the error was thrown when you ran df.show because everything was being lazily evaluated up until that point. df.show is an action which will cause all of the queued transformations to execute (see this article for more).

Upvotes: 3

Alberto Bonsanto
Alberto Bonsanto

Reputation: 18022

As you said in your comment, your case class employee, which should be named Employee, receives an Int as first argument of its constructor, but you are passing a String. Thus, you should convert it to an Int before instantiating or modify your case defining eid as a String.

Upvotes: 1

Related Questions