Ged
Ged

Reputation: 18053

map & case for Product with Serializable

When running:

rdd.zipWithIndex().map { case ((a,b),c) => (a,b,c)}.collect()

that is fine.

What if the (a,b) is in fact a Product with Serializable? E.g. 2, 3 and 4 elements, or even N elements? How then to write the case for any number of values? List?

Upvotes: 0

Views: 593

Answers (2)

Andrey Tyukin
Andrey Tyukin

Reputation: 44937

First I would like to say what you could do, and then I'd like to say why you shouldn't.

You could do this:

// for (arity <- 2 to 20) {
//   println(
//     "case (" + 
//     (0 until arity).map(i => ('a' + i).toChar).mkString("(", ",", ")") + 
//     ",idx) => (" +
//     (0 until arity).map(i => ('a' + i).toChar).mkString(",") +
//     ",idx)"
//   )
// }

rdd.zipWithIndex.map{
  case ((a,b),idx) => (a,b,idx)
  case ((a,b,c),idx) => (a,b,c,idx)
  case ((a,b,c,d),idx) => (a,b,c,d,idx)
  case ((a,b,c,d,e),idx) => (a,b,c,d,e,idx)
  case ((a,b,c,d,e,f),idx) => (a,b,c,d,e,f,idx)
  case ((a,b,c,d,e,f,g),idx) => (a,b,c,d,e,f,g,idx)
  case ((a,b,c,d,e,f,g,h),idx) => (a,b,c,d,e,f,g,h,idx)
  case ((a,b,c,d,e,f,g,h,i),idx) => (a,b,c,d,e,f,g,h,i,idx)
  case ((a,b,c,d,e,f,g,h,i,j),idx) => (a,b,c,d,e,f,g,h,i,j,idx)
  case ((a,b,c,d,e,f,g,h,i,j,k),idx) => (a,b,c,d,e,f,g,h,i,j,k,idx)
  case ((a,b,c,d,e,f,g,h,i,j,k,l),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,idx)
  case ((a,b,c,d,e,f,g,h,i,j,k,l,m),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,idx)
  case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,idx)
  case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n,o),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,idx)
  case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,idx)
  case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,idx)
  case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,idx)
  case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,idx)
  case ((a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t),idx) => (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,idx)
  case _ => throw new Error("not a tuple")
}

Given that the source code for Product2 to Product22 is generated, it might look like an ugly yet acceptable solution at the first glance.

However, notice that if you do this once, you end up in exactly the same situation in the next step. You again have an RDD full of tuples of different dimensions. What do you do with those? The same horrible match-case? If you continue doing this, it will infect your entire codebase.

Therefore, it's better to bite the bullet right away, and convert it to some kind of Seq as soon as possible. You can use productIterator for that. Consider using some kind of Lists or spark.sql Rows. Maybe something like this:

rdd.zipWithIndex.map{ 
  case (tuple, idx) => 
  Row.fromSeq(tuple.productIterator.toSeq :+ idx)
}

Upvotes: 1

Pavithran Ramachandran
Pavithran Ramachandran

Reputation: 993

Using List instead of RDD but the core logic will remain the same. Use productIterator method of tuple to achieve this.

scala> val input = List((1, 2L, "A"), (3, 4L, "B"))
input: List[(Int, Long, String)] = List((1,2,A), (3,4,B))

scala> val output = input.zipWithIndex.map{
     |       case (a, b) =>
     |         // where a => (Int, Long, String)
     |         // hence a.productIterator is Iterator[Any]
     |         a.productIterator ++ Seq(b)
     |     }
output: List[Iterator[Any]] = List(non-empty iterator, non-empty iterator)

scala> output.foreach(x => println(x.mkString(", ")))
1, 2, A, 0
3, 4, B, 1

The idea is not to extract the tuple, but to preserve it and use "productIterator" and append with it. This approach will work for any type of data in the RDD record and will also support complex data structures too.

Upvotes: 2

Related Questions