Reputation: 6159
I have two key value Pair rdd's A and B
, data looks like
A={(1,(1,john,CA)),
(2,(2,steve,NY)),
(3,(3,jonny,AL)),
(4,(4,Evan,AK)),
(5,(5,Tommy,AZ))}
B={(1,(1,john,WA)),
(1,(1,john,FL)),
(1,(1,john,GA)),
(2,(2,steve,NY)),
(3,(3,jonny,AL)),
(4,(4,Evan,AK)),
(5,(5,Tommy,AZ))}
Rdd B
has three values for the key 1
so while applying cogroup
c = A.cogroup(B).filter { x => ((x._2._1) != (x._2._2)) }.collect() we get
c = {(1,CompactBuffer(1,john,CA),CompactBuffer(1,john,WA,1,john,FL,1,john,GA)}
Collecting two CompactBuffers
in two variables like below
d = c.map(tuple =>(tuple._2._1.mkString("")))
e = c.map(tuple =>(tuple._2._2.mkString("")))
Iterating d
and e
like below
for(x <-d)
{
for(y <-e){
println(x +" source and destination "+ y)
}
}
Expected output
1,john,CA source and destination 1,john,WA
1,john,CA source and destination 1,john,FL
1,john,CA source and destination 1,john,GA
Output received
1,john,CA source and destination 1,john,WA,1,john,FL,1,john,GA
What I should change for Iterating the Second Tuple elements i.e Second Compactbuffer()
Kindly Let me know if you have any doubts or clarification in question.
Upvotes: 0
Views: 355
Reputation: 10092
As suggested in the comments , mkString
was converting your array to an array of one element. You can alternatively evaluate your lazy iterator by converting it to an Array and then iterating over it :
c.foreach { x =>
val arr1 = x._2._1.toArray
val arr2 = x._2._2.toArray
for (e1 <- arr1 ) {
for (e2 <- arr2 ) {
println (e1 + "-----------" + e2 )
}
}
}
(1,john,CA)-----------(1,john,WA)
(1,john,CA)-----------(1,john,FL)
(1,john,CA)-----------(1,john,GA)
With what you had written, you can replace mkString
with a flatMap
operation to evaluate the iterator:
d = c.flatMap(tuple =>tuple._2._1)
e = c.flatMap(tuple =>tuple._2._2)
And then proceed with your for
loop.
Upvotes: 1