Reputation: 1808
I have two RDDs with same keys and different values.
I call on both of them the same .partitionBy(partitioner)
and then I join them:
val partitioner = new HashPartitioner(partitions = 4)
val a = spark.sparkContext.makeRDD(Seq(
(1, "A"), (2, "B"), (3, "C"), (4, "D"), (5, "E"), (6, "F"), (7, "G"), (8, "H")
)).partitionBy(partitioner)
val b = spark.sparkContext.makeRDD(Seq(
(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"), (6, "f"), (7, "g"), (8, "h")
)).partitionBy(partitioner)
println("A:")
a.foreachPartition(p => {
p.foreach(t => print(t + " "))
println()
})
println("B:")
b.foreachPartition(p => {
p.foreach(t => print(t + " "))
println()
})
println("Join:")
a.join(b, partitioner)
.foreachPartition(p => {
p.foreach(t => print(t + " "))
println()
})
I get:
A:
(2,B) (3,C) (4,D) (6,F) (7,G)
(8,H) (1,A)
(5,E)
B:
(3,c) (7,g)
(1,a) (5,e)
(2,b) (6,f)
(4,d) (8,h)
Join:
(6,(F,f)) (1,(A,a)) (2,(B,b)) (5,(E,e)) (4,(D,d)) (8,(H,h))
(3,(C,c)) (7,(G,g))
So the first question is why are A and B partitions different and why joinRDD is different from both of them?
Upvotes: 1
Views: 493
Reputation: 330193
The partitioning is exactly the same in all cases. The problem is the method you use. Remember that each partition is processed in a separate thread. If you run this code multiple times, you'll see that the output is actually non-deterministic.
Try for example something like this instead:
a.glom.collect.map(_.mkString(" ")).foreach(println)
(4,D) (8,H)
(1,A) (5,E)
(2,B) (6,F)
(3,C) (7,G)
b.glom.collect.map(_.mkString(" ")).foreach(println)
(4,d) (8,h)
(1,a) (5,e)
(2,b) (6,f)
(3,c) (7,g)
a.join(b).glom.collect.map(_.mkString(" ")).foreach(println)
(4,(D,d)) (8,(H,h))
(1,(A,a)) (5,(E,e))
(6,(F,f)) (2,(B,b))
(3,(C,c)) (7,(G,g))
Note that the order of values in each partition can still be non-deterministic if executed in non-local
context, but content of each partition will be the same as shown above.
Upvotes: 2