user9609362
user9609362

Reputation: 35

Errors After Converting RDD to Dataframe: "java.lang.String is not a valid external type for schema of int"

I'm trying to convert a RDD into a Dataframe without using a case class. The csv file looks like this:

3,193080,De Gea <br>
0,158023,L. Messi <br>
4,192985,K. De Bruyne <br>
1,20801,Cristiano Ronaldo <br>
2,190871,Neymar Jr <br>


val players = sc.textFile("/Projects/Downloads/players.csv").map(line => line.split(',')).map(r => Row(r(1),r(2),r(3)))
# players: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[230] at map at <console>:34

val schema = StructType(List(StructField("id",IntegerType),StructField("age",IntegerType),StructField("name",StringType)))
# schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(age,IntegerType,true), StructField(name,StringType,true))

val playersDF = spark.createDataFrame(players,schema)
# playersDF: org.apache.spark.sql.DataFrame = [id: int, age: int ... 1 more field]

Everything goes well until I try for example to do a playersDF.show

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int 

What can I do?

Upvotes: 0

Views: 3906

Answers (3)

Azeezullah shariff
Azeezullah shariff

Reputation: 1

//Input
StudentId,Name,Address
101,Shoaib,Anwar Layout
102,Shahbaz,Sara padlya
103,Fahad,Munredy padlya
104,Sana,Tannery Road
105,Zeeshan,Muslim colony
106,Azeem,Khusal nagar
107,Nazeem,KR puram

import org.apache.spark.sql.{Row, SQLContext, types}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}

object SparkCreateDFWithRDD {


  def main(args: Array[String]): Unit = {



    val conf = new SparkConf().setAppName("Creating DF WITH RDD").setMaster("local")

    val sc = new SparkContext(conf)

    val sqlcontext = new SQLContext(sc)

    val rdd = sc.textFile("/home/cloudera/Desktop/inputs/studentDetails1.csv")

    val header = rdd.first()

    val rddData = rdd.filter(x => x != header).map(x => {
      val arr = x.split(",")
      Row(arr(0).toInt, arr(1), arr(2))
    })

    val schemas = StructType(Array(StructField("StudentId",IntegerType,false),
                       StructField("StudentName",StringType,false),StructField("StudentAddress",StringType,true)))


    val df = sqlcontext.createDataFrame(rddData,schemas)

    df.printSchema()
    df.show()

  }

}

+---------+-----------+--------------+
|StudentId|StudentName|StudentAddress|
+---------+-----------+--------------+
|      101|     Shoaib|  Anwar Layout|
|      102|    Shahbaz|   Sara padlya|
|      103|      Fahad|Munredy padlya|
|      104|       Sana|  Tannery Road|
|      105|    Zeeshan| Muslim colony|
|      106|      Azeem|  Khusal nagar|
|      107|     Nazeem|      KR puram|
+---------+-----------+--------------+

Upvotes: 0

Leonardo Herrera
Leonardo Herrera

Reputation: 8406

I think the best option is to provide a schema and read the csv file using the existing facilities.

import org.apache.spark.sql.types._

val playerSchema = StructType(Array(
    StructField("id", IntegerType, true),
    StructField("age", IntegerType, true),
    StructField("name", StringType, true)
))

val players = spark
    .sqlContext
    .read
    .format("csv")
    .option("delimiter", ",")
    .schema(playerSchema)
    .load("/mypath/players.csv")

Here's the result:

scala> players.show
+---+------+-----------------+
| id|   age|             name|
+---+------+-----------------+
|  3|193080|           De Gea|
|  0|158023|         L. Messi|
|  4|192985|     K. De Bruyne|
|  1| 20801|Cristiano Ronaldo|
|  2|190871|        Neymar Jr|
+---+------+-----------------+

scala> players.printSchema()
root
 |-- id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

scala>

Upvotes: 1

akuiper
akuiper

Reputation: 215117

You have two issues:

1) Your index is off; Scala is 0-based. Row(r(1),r(2),r(3)) should be Row(r(0),r(1),r(2)).

2) line.split returns Array[String] while your schema indicates the first and second fields should be integers. You need to cast them to integers before creating the data frame.

Basically this is how you should create players:

val players = rdd.map(line => line.split(","))
                 .map(r => Row(r(0).toInt, r(1).toInt, r(2)))

Upvotes: 1

Related Questions