zork
zork

Reputation: 2135

Spark: How to 'scan' RDD collections?

Does Spark have any analog of Scala scan operation to work on RDD collections? (for details please see Reduce, fold or scan (Left/Right)?)

For example:

val abc = List("A", "B", "C")

def add(res: String, x: String) = { 
  println(s"op: $res + $x = ${res + x}")
  res + x
} 

So to get:

abc.scanLeft("z")(add)
// op: z + A = zA      // same operations as foldLeft above...
// op: zA + B = zAB
// op: zAB + C = zABC
// res: List[String] = List(z, zA, zAB, zABC) // maps intermediate results

Any other means to achieve the same result?

Update

What is "Spark" way to solve, for example, the following problem:

Compute elements of the vector as (in pseudocode):

x(i) = SomeFun(for k from 0 to i-1)(y(k)) 

Should I collect RDD for this? No other way?

Update 2

Ok, I understand the general problem. Yet maybe you could advise me on the particular case I have to deal with.

I have a list of ints as input RDD and I have to build an outptut RDD, where the following should hold:

1) input.length == output.length // output list is of the same length as input

2) output(i) = sum( range (0..i), input(i)) / q^i // i-th element of output list equals sum of input elements from 0 to i divided by i-th power of some constant q   

In fact I need a combination of map and fold function to solve this.

Another idea is to write a recursive fold on diminishing tails of the input list. But this is super inefficient and AFAIK Spark does not have tail or init function for RDD.

How would you solve this problem in Sparck?

Upvotes: 1

Views: 1797

Answers (1)

WestCoastProjects
WestCoastProjects

Reputation: 63192

You are correct that there does not exist the analog of scan() in the generic RDD.

A potential explantion: Such a method would require access to all elements of the distributed collection to process each element of the generated output collection. before continuing on to the next output element.

So if your input list were say 1 million plus one entries there would be 1 million shuffle operations on the cluster (even though the sorting is not required here - spark gives it for "free" when doing a cluster collect step).

UPDATE OP has expanded the question. Here is response to the expanded question.

from updated OP:

x(i) = SomeFun(for k from 0 to i-1)(y(k)) 

You need to distinguish whether x(i) computation - specifically the y(k) function - were going to either:

  • require access to the entire dataset x(0 .. i -1)
  • change the structure of the dataset

on each iteration. That is the case for scan - and given your description it seems to be your purpose. AFAIK this is not supported in Spark. Once again - think if you were developing the distributed framework. How would you achieve same? It does not seem to be a scalable means to achieve - so yes you would need to do that computation in an

collect()

invocation against the original RDD and perform it on the Driver.

Upvotes: 2

Related Questions