padmanabh pande
padmanabh pande

Reputation: 427

How to convert RDD[Array[Any]] to DataFrame?

I have RDD[Array[Any]] as follows,

1556273771,Mumbai,1189193,1189198,0.56,-1,India,Australia,1571215104,1571215166
8374749403,London,1189193,1189198,0,1,India,England,4567362933,9374749392
7439430283,Dubai,1189193,1189198,0.76,-1,Pakistan,Sri Lanka,1576615684,4749383749

I need to convert this to a data frame of 10 columns, but I am new to spark. Please let me know how to do this in the simplest way.

I am trying something similar to this code:

rdd_data.map{case Array(a,b,c,d,e,f,g,h,i,j) => (a,b,c,d,e,f,g,h,i,j)}.toDF()

Upvotes: 1

Views: 584

Answers (3)

André Machado
André Machado

Reputation: 724

When you create a dataframe, Spark needs to know the data type of each column. "Any" type is just a way of saying that you don't know the variable type. A possible solution is to cast each value to a specific type. This will of course fail if the specified cast is invalid.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val rdd1 = spark.sparkContext.parallelize(
    Array(
        Array(1556273771L,"Mumbai",1189193,1189198 ,0.56,-1,"India",   "Australia",1571215104L,1571215166L),
        Array(8374749403L,"London",1189193,1189198 ,0   , 1,"India",   "England",  4567362933L,9374749392L),
        Array(7439430283L,"Dubai" ,1189193,1189198 ,0.76,-1,"Pakistan","Sri Lanka",1576615684L,4749383749L)
    ),1)
//rdd1: org.apache.spark.rdd.RDD[Array[Any]]

val rdd2 = rdd1.map(r => Row(
    r(0).toString.toLong, 
    r(1).toString, 
    r(2).toString.toInt, 
    r(3).toString.toInt, 
    r(4).toString.toDouble, 
    r(5).toString.toInt, 
    r(6).toString, 
    r(7).toString, 
    r(8).toString.toLong, 
    r(9).toString.toLong
))


val schema = StructType(
List(
    StructField("col0", LongType, false),
    StructField("col1", StringType, false),
    StructField("col2", IntegerType, false),
    StructField("col3", IntegerType, false),
    StructField("col4", DoubleType, false),
    StructField("col5", IntegerType, false),
    StructField("col6", StringType, false),
    StructField("col7", StringType, false),
    StructField("col8", LongType, false),
    StructField("col9", LongType, false)
  ) 
)

val df = spark.createDataFrame(rdd2, schema)

df.show
+----------+------+-------+-------+----+----+--------+---------+----------+----------+
|      col0|  col1|   col2|   col3|col4|col5|    col6|     col7|      col8|      col9|
+----------+------+-------+-------+----+----+--------+---------+----------+----------+
|1556273771|Mumbai|1189193|1189198|0.56|  -1|   India|Australia|1571215104|1571215166|
|8374749403|London|1189193|1189198| 0.0|   1|   India|  England|4567362933|9374749392|
|7439430283| Dubai|1189193|1189198|0.76|  -1|Pakistan|Sri Lanka|1576615684|4749383749|
+----------+------+-------+-------+----+----+--------+---------+----------+----------+

df.printSchema
root
 |-- col0: long (nullable = false)
 |-- col1: string (nullable = false)
 |-- col2: integer (nullable = false)
 |-- col3: integer (nullable = false)
 |-- col4: double (nullable = false)
 |-- col5: integer (nullable = false)
 |-- col6: string (nullable = false)
 |-- col7: string (nullable = false)
 |-- col8: long (nullable = false)
 |-- col9: long (nullable = false)

Hope it helps

Upvotes: 2

Ryan Widmaier
Ryan Widmaier

Reputation: 8523

As the other posts mention, a DataFrame requires explicit types for each column, so you can't use Any. The easiest way I can think of would be to turn each row into a tuple of the right types then use implicit DF creation to convert to a DataFrame. You were pretty close in your code, you just need to cast the elements to an acceptable type.

Basically toDF knows how to convert tuples (with accepted types) into a DF Row, and you can pass the column names into the toDF call.

For example:

val data = Array(1556273771, "Mumbai", 1189193, 1189198, 0.56, -1, "India,Australia", 1571215104, 1571215166)
val rdd = sc.parallelize(Seq(data))

val df = rdd.map {
    case Array(a,b,c,d,e,f,g,h,i) => (
        a.asInstanceOf[Int],
        b.asInstanceOf[String],
        c.asInstanceOf[Int],
        d.asInstanceOf[Int],
        e.toString.toDouble,
        f.asInstanceOf[Int],
        g.asInstanceOf[String],
        h.asInstanceOf[Int],
        i.asInstanceOf[Int]
    )
}.toDF("int1", "city", "int2", "int3", "float1", "int4", "country", "int5", "int6")

df.printSchema
df.show(100, false)


scala> df.printSchema
root
 |-- int1: integer (nullable = false)
 |-- city: string (nullable = true)
 |-- int2: integer (nullable = false)
 |-- int3: integer (nullable = false)
 |-- float1: double (nullable = false)
 |-- int4: integer (nullable = false)
 |-- country: string (nullable = true)
 |-- int5: integer (nullable = false)
 |-- int6: integer (nullable = false)


scala> df.show(100, false)
+----------+------+-------+-------+------+----+---------------+----------+----------+
|int1      |city  |int2   |int3   |float1|int4|country        |int5      |int6      |
+----------+------+-------+-------+------+----+---------------+----------+----------+
|1556273771|Mumbai|1189193|1189198|0.56  |-1  |India,Australia|1571215104|1571215166|
+----------+------+-------+-------+------+----+---------------+----------+----------+

Edit for 0 -> Double:

As André pointed out, if you start off with 0 as an Any it will be a java Integer, not a scala Int, and therefore not castable to a scala Double. Converting it to a string first lets you then convert it into a double as desired.

Upvotes: 1

chlebek
chlebek

Reputation: 2451

You can try below approach, it's a bit tricky but without bothering with schema. Map Any to String using toDF(), create DataFrame of arrays then create new columns by selecting each element from array column.

  val rdd: RDD[Array[Any]] = spark.range(5).rdd.map(s => Array(s,s+1,s%2))
  val size = rdd.first().length

  def splitCol(col: Column): Seq[(String, Column)] = {
    (for (i <- 0 to size - 1) yield ("_" + i, col(i)))
  }

  import spark.implicits._

  rdd.map(s=>s.map(s=>s.toString()))
    .toDF("x")
    .select(splitCol('x).map(_._2):_*)
    .toDF(splitCol('x).map(_._1):_*)
    .show()

+---+---+---+
| _0| _1| _2|
+---+---+---+
|  0|  1|  0|
|  1|  2|  1|
|  2|  3|  0|
|  3|  4|  1|
|  4|  5|  0|
+---+---+---+

Upvotes: 0

Related Questions