Reputation: 952
As the title describes, say I have two RDDs
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([1,0,0])
or
rdd3 = sc.parallelize([("Id", 1),("Id", 2),("Id",3)])
rdd4 = sc.parallelize([("Result", 1),("Result", 0),("Result", 0)])
How can I create the following DataFrame?
Id Result
1 1
2 0
3 0
If I could create the paired RDD [(1,1),(2,0),(3,0)] then sqlCtx.createDataFrame
would give me what I want, but I don't know how?
I'd appreciate any comment or help!
Upvotes: 2
Views: 2193
Reputation: 13927
So first off, there is an RDD operation called RDD.zipWithIndex
. If you called rdd2.zipWithIndex
you would get:
scala> rdd2.zipWithIndex collect() foreach println
(1,0)
(0,1)
(0,2)
If you wanted to make it look like yours, just do this:
scala> rdd2.zipWithIndex map(t => (t._2 + 1,t._1)) collect() foreach println
(1,1)
(2,0)
(3,0)
If you really need to zip the two RDDs, then just use RDD.zip
scala> rdd1.zip(rdd2) collect() foreach println
(1,1)
(2,0)
(3,0)
Upvotes: 2
Reputation: 7442
Provided that they have the same partitioner and the same number of elements per partition, you can use the zip
function, e.g.
case class Elem(id: Int, result: Int)
val df = sqlCtx.createDataFrame(rdd1.zip(rdd2).map(x => Elem(x._1, x._2)))
Upvotes: 1