Reputation: 18053
When running:
rdd.zipWithIndex().map { case ((a,b),c) => (a,b,c)}.collect()
that is fine.
What if the (a,b) is in fact a Product with Serializable? E.g. 2, 3 and 4 elements, or even N elements? How then to write the case for any number of values? List?
Upvotes: 0
Views: 593
Reputation: 44937
First I would like to say what you could do, and then I'd like to say why you shouldn't.
You could do this:
// for (arity <- 2 to 20) {
// println(
// "case (" +
// (0 until arity).map(i => ('a' + i).toChar).mkString("(", ",", ")") +
// ",idx) => (" +
// (0 until arity).map(i => ('a' + i).toChar).mkString(",") +
// ",idx)"
// )
// }
rdd.zipWithIndex.map{
case ((a,b),idx) => (a,b,idx)
case ((a,b,c),idx) => (a,b,c,idx)
case ((a,b,c,d),idx) => (a,b,c,d,idx)
case ((a,b,c,d,e),idx) => (a,b,c,d,e,idx)
case ((a,b,c,d,e,f),idx) => (a,b,c,d,e,f,idx)
case ((a,b,c,d,e,f,g),idx) => (a,b,c,d,e,f,g,idx)
case ((a,b,c,d,e,f,g,h),idx) => (a,b,c,d,e,f,g,h,idx)
case ((a,b,c,d,e,f,g,h,i),idx) => (a,b,c,d,e,f,g,h,i,idx)
case ((a,b,c,d,e,f,g,h,i,j),idx) => (a,b,c,d,e,f,g,h,i,j,idx)
case ((a,b,c,d,e,f,g,h,i,j,k),idx) => (a,b,c,d,e,f,g,h,i,j,k,idx)
case ((a,b,c,d,e,f,g,h,i,j,k,l),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,idx)
case ((a,b,c,d,e,f,g,h,i,j,k,l,m),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,idx)
case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,idx)
case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n,o),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,idx)
case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,idx)
case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,idx)
case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,idx)
case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,idx)
case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,idx)
case _ => throw new Error("not a tuple")
}
Given that the source code for Product2
to Product22
is generated, it might look like an ugly yet acceptable solution at the first glance.
However, notice that if you do this once, you end up in exactly the same situation in the next step. You again have an RDD full of tuples of different dimensions. What do you do with those? The same horrible match-case? If you continue doing this, it will infect your entire codebase.
Therefore, it's better to bite the bullet right away, and convert it to some kind of Seq
as soon as possible. You can use productIterator
for that. Consider using some kind of Lists or spark.sql Row
s. Maybe something like this:
rdd.zipWithIndex.map{
case (tuple, idx) =>
Row.fromSeq(tuple.productIterator.toSeq :+ idx)
}
Upvotes: 1
Reputation: 993
Using List instead of RDD but the core logic will remain the same. Use productIterator method of tuple to achieve this.
scala> val input = List((1, 2L, "A"), (3, 4L, "B"))
input: List[(Int, Long, String)] = List((1,2,A), (3,4,B))
scala> val output = input.zipWithIndex.map{
| case (a, b) =>
| // where a => (Int, Long, String)
| // hence a.productIterator is Iterator[Any]
| a.productIterator ++ Seq(b)
| }
output: List[Iterator[Any]] = List(non-empty iterator, non-empty iterator)
scala> output.foreach(x => println(x.mkString(", ")))
1, 2, A, 0
3, 4, B, 1
The idea is not to extract the tuple, but to preserve it and use "productIterator" and append with it. This approach will work for any type of data in the RDD record and will also support complex data structures too.
Upvotes: 2