Reputation: 427
I have RDD[Array[Any]] as follows,
1556273771,Mumbai,1189193,1189198,0.56,-1,India,Australia,1571215104,1571215166
8374749403,London,1189193,1189198,0,1,India,England,4567362933,9374749392
7439430283,Dubai,1189193,1189198,0.76,-1,Pakistan,Sri Lanka,1576615684,4749383749
I need to convert this to a data frame of 10 columns, but I am new to spark. Please let me know how to do this in the simplest way.
I am trying something similar to this code:
rdd_data.map{case Array(a,b,c,d,e,f,g,h,i,j) => (a,b,c,d,e,f,g,h,i,j)}.toDF()
Upvotes: 1
Views: 584
Reputation: 724
When you create a dataframe, Spark needs to know the data type of each column. "Any" type is just a way of saying that you don't know the variable type. A possible solution is to cast each value to a specific type. This will of course fail if the specified cast is invalid.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val rdd1 = spark.sparkContext.parallelize(
Array(
Array(1556273771L,"Mumbai",1189193,1189198 ,0.56,-1,"India", "Australia",1571215104L,1571215166L),
Array(8374749403L,"London",1189193,1189198 ,0 , 1,"India", "England", 4567362933L,9374749392L),
Array(7439430283L,"Dubai" ,1189193,1189198 ,0.76,-1,"Pakistan","Sri Lanka",1576615684L,4749383749L)
),1)
//rdd1: org.apache.spark.rdd.RDD[Array[Any]]
val rdd2 = rdd1.map(r => Row(
r(0).toString.toLong,
r(1).toString,
r(2).toString.toInt,
r(3).toString.toInt,
r(4).toString.toDouble,
r(5).toString.toInt,
r(6).toString,
r(7).toString,
r(8).toString.toLong,
r(9).toString.toLong
))
val schema = StructType(
List(
StructField("col0", LongType, false),
StructField("col1", StringType, false),
StructField("col2", IntegerType, false),
StructField("col3", IntegerType, false),
StructField("col4", DoubleType, false),
StructField("col5", IntegerType, false),
StructField("col6", StringType, false),
StructField("col7", StringType, false),
StructField("col8", LongType, false),
StructField("col9", LongType, false)
)
)
val df = spark.createDataFrame(rdd2, schema)
df.show
+----------+------+-------+-------+----+----+--------+---------+----------+----------+
| col0| col1| col2| col3|col4|col5| col6| col7| col8| col9|
+----------+------+-------+-------+----+----+--------+---------+----------+----------+
|1556273771|Mumbai|1189193|1189198|0.56| -1| India|Australia|1571215104|1571215166|
|8374749403|London|1189193|1189198| 0.0| 1| India| England|4567362933|9374749392|
|7439430283| Dubai|1189193|1189198|0.76| -1|Pakistan|Sri Lanka|1576615684|4749383749|
+----------+------+-------+-------+----+----+--------+---------+----------+----------+
df.printSchema
root
|-- col0: long (nullable = false)
|-- col1: string (nullable = false)
|-- col2: integer (nullable = false)
|-- col3: integer (nullable = false)
|-- col4: double (nullable = false)
|-- col5: integer (nullable = false)
|-- col6: string (nullable = false)
|-- col7: string (nullable = false)
|-- col8: long (nullable = false)
|-- col9: long (nullable = false)
Hope it helps
Upvotes: 2
Reputation: 8523
As the other posts mention, a DataFrame requires explicit types for each column, so you can't use Any. The easiest way I can think of would be to turn each row into a tuple of the right types then use implicit DF creation to convert to a DataFrame. You were pretty close in your code, you just need to cast the elements to an acceptable type.
Basically toDF
knows how to convert tuples (with accepted types) into a DF Row, and you can pass the column names into the toDF
call.
For example:
val data = Array(1556273771, "Mumbai", 1189193, 1189198, 0.56, -1, "India,Australia", 1571215104, 1571215166)
val rdd = sc.parallelize(Seq(data))
val df = rdd.map {
case Array(a,b,c,d,e,f,g,h,i) => (
a.asInstanceOf[Int],
b.asInstanceOf[String],
c.asInstanceOf[Int],
d.asInstanceOf[Int],
e.toString.toDouble,
f.asInstanceOf[Int],
g.asInstanceOf[String],
h.asInstanceOf[Int],
i.asInstanceOf[Int]
)
}.toDF("int1", "city", "int2", "int3", "float1", "int4", "country", "int5", "int6")
df.printSchema
df.show(100, false)
scala> df.printSchema
root
|-- int1: integer (nullable = false)
|-- city: string (nullable = true)
|-- int2: integer (nullable = false)
|-- int3: integer (nullable = false)
|-- float1: double (nullable = false)
|-- int4: integer (nullable = false)
|-- country: string (nullable = true)
|-- int5: integer (nullable = false)
|-- int6: integer (nullable = false)
scala> df.show(100, false)
+----------+------+-------+-------+------+----+---------------+----------+----------+
|int1 |city |int2 |int3 |float1|int4|country |int5 |int6 |
+----------+------+-------+-------+------+----+---------------+----------+----------+
|1556273771|Mumbai|1189193|1189198|0.56 |-1 |India,Australia|1571215104|1571215166|
+----------+------+-------+-------+------+----+---------------+----------+----------+
Edit for 0 -> Double:
As André pointed out, if you start off with 0 as an Any it will be a java Integer, not a scala Int, and therefore not castable to a scala Double. Converting it to a string first lets you then convert it into a double as desired.
Upvotes: 1
Reputation: 2451
You can try below approach, it's a bit tricky but without bothering with schema.
Map Any
to String
using toDF()
, create DataFrame
of arrays then create new columns by selecting each element from array column.
val rdd: RDD[Array[Any]] = spark.range(5).rdd.map(s => Array(s,s+1,s%2))
val size = rdd.first().length
def splitCol(col: Column): Seq[(String, Column)] = {
(for (i <- 0 to size - 1) yield ("_" + i, col(i)))
}
import spark.implicits._
rdd.map(s=>s.map(s=>s.toString()))
.toDF("x")
.select(splitCol('x).map(_._2):_*)
.toDF(splitCol('x).map(_._1):_*)
.show()
+---+---+---+
| _0| _1| _2|
+---+---+---+
| 0| 1| 0|
| 1| 2| 1|
| 2| 3| 0|
| 3| 4| 1|
| 4| 5| 0|
+---+---+---+
Upvotes: 0