Reputation: 35
I'm trying to convert a RDD into a Dataframe without using a case class. The csv file looks like this:
3,193080,De Gea <br>
0,158023,L. Messi <br>
4,192985,K. De Bruyne <br>
1,20801,Cristiano Ronaldo <br>
2,190871,Neymar Jr <br>
val players = sc.textFile("/Projects/Downloads/players.csv").map(line => line.split(',')).map(r => Row(r(1),r(2),r(3)))
# players: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[230] at map at <console>:34
val schema = StructType(List(StructField("id",IntegerType),StructField("age",IntegerType),StructField("name",StringType)))
# schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(age,IntegerType,true), StructField(name,StringType,true))
val playersDF = spark.createDataFrame(players,schema)
# playersDF: org.apache.spark.sql.DataFrame = [id: int, age: int ... 1 more field]
Everything goes well until I try for example to do a playersDF.show
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
What can I do?
Upvotes: 0
Views: 3906
Reputation: 1
//Input
StudentId,Name,Address
101,Shoaib,Anwar Layout
102,Shahbaz,Sara padlya
103,Fahad,Munredy padlya
104,Sana,Tannery Road
105,Zeeshan,Muslim colony
106,Azeem,Khusal nagar
107,Nazeem,KR puram
import org.apache.spark.sql.{Row, SQLContext, types}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
object SparkCreateDFWithRDD {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Creating DF WITH RDD").setMaster("local")
val sc = new SparkContext(conf)
val sqlcontext = new SQLContext(sc)
val rdd = sc.textFile("/home/cloudera/Desktop/inputs/studentDetails1.csv")
val header = rdd.first()
val rddData = rdd.filter(x => x != header).map(x => {
val arr = x.split(",")
Row(arr(0).toInt, arr(1), arr(2))
})
val schemas = StructType(Array(StructField("StudentId",IntegerType,false),
StructField("StudentName",StringType,false),StructField("StudentAddress",StringType,true)))
val df = sqlcontext.createDataFrame(rddData,schemas)
df.printSchema()
df.show()
}
}
+---------+-----------+--------------+
|StudentId|StudentName|StudentAddress|
+---------+-----------+--------------+
| 101| Shoaib| Anwar Layout|
| 102| Shahbaz| Sara padlya|
| 103| Fahad|Munredy padlya|
| 104| Sana| Tannery Road|
| 105| Zeeshan| Muslim colony|
| 106| Azeem| Khusal nagar|
| 107| Nazeem| KR puram|
+---------+-----------+--------------+
Upvotes: 0
Reputation: 8406
I think the best option is to provide a schema and read the csv file using the existing facilities.
import org.apache.spark.sql.types._
val playerSchema = StructType(Array(
StructField("id", IntegerType, true),
StructField("age", IntegerType, true),
StructField("name", StringType, true)
))
val players = spark
.sqlContext
.read
.format("csv")
.option("delimiter", ",")
.schema(playerSchema)
.load("/mypath/players.csv")
Here's the result:
scala> players.show
+---+------+-----------------+
| id| age| name|
+---+------+-----------------+
| 3|193080| De Gea|
| 0|158023| L. Messi|
| 4|192985| K. De Bruyne|
| 1| 20801|Cristiano Ronaldo|
| 2|190871| Neymar Jr|
+---+------+-----------------+
scala> players.printSchema()
root
|-- id: integer (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
scala>
Upvotes: 1
Reputation: 215117
You have two issues:
1) Your index is off; Scala is 0-based. Row(r(1),r(2),r(3))
should be Row(r(0),r(1),r(2))
.
2) line.split
returns Array[String]
while your schema indicates the first and second fields should be integers. You need to cast them to integers before creating the data frame.
Basically this is how you should create players
:
val players = rdd.map(line => line.split(","))
.map(r => Row(r(0).toInt, r(1).toInt, r(2)))
Upvotes: 1