user1206899
user1206899

Reputation: 1920

Should the common ancestors on the transformation dependency graph be cached?

I'm relatively new to spark and might even be wrong before finishing building up the scenario questions so feel free to skip reading and point it out where you find I'm conceptually wrong, thanks!

Imagine a piece of driver code like this:

val A = ... (some transformation)
val B = A.filter( fun1 ) 
val C = A.filter( fun2 ) 
...
B.someAction()... //do sth with B
...
C.someAction()... //do sth with C

Transformation RDDs B and C both depend on A which might itself be a complex transformation. So will A be computed twice ? I argue that it will because spark can't do anything that's inter-transformations, right ? Spark is intelligent on optimizing one transformation execution at a time because the bundled tasks in it could be throughly analyzed. For example it's possible that some state change occurs after B.someAction but before C.someAction which may affect the value of A so the re-computation becomes necessary. For further example It could happen like this:

val arr = Array(...)
val A = sc.parallelize(...).flatMap(e => arr.map(_ * e)) //now A depends on some local array

... //B and C stays the same as above

B.someAction()
...
arr(i) = arr(i) + 10  //local state modified
...
C.someAction() //should A be recomputed? YES

This is easy to verify so I did a quick experiment and the result supports my reasoning.

However if B and C just independently depend on A and no other logic like above exists then a programmer or some tool could statically analyze the code and say hey it’s feasible to add a cache on A so that it doesn’t unnecessarily recompute! But spark can do nothing about this and sometimes it’s even hard for human to decide:

val A = ... (some transformation)
var B = A.filter( fun1 ) 
var C: ??? = null
var D: ??? = null

if (cond) {
  //now whether multiple dependencies exist is runtime determined
  C = A.filter( fun2 ) 
  D = A.filter( fun3 )
}

B.someAction()... //do sth with B

if (cond) {
  C.someAction()... //do sth with C
  D.someAction()... //do sth with D
}

If the condition is true then it’s tempting to cache A but you’ll never know until runtime. I know this is an artificial crappy example but these are already simplified models things could get more complicated in practice and the dependencies could be quite long and implicit and spread across modules so my question is what’s the general principle to deal with this kind of problem. When should the common ancestors on the transformation dependency graph be cached (provided memory is not an issue) ?

I’d like to hear something like always follow functional programming paradigms doing spark or always cache them if you can however there’s another situation that I may not need to:

val A = ... (some transformation)
val B = A.filter( fun1 ) 
val C = A.filter( fun2 ) 
...
B.join(C).someAction()

Again B and C both depend on A but instead of calling two actions separately they are joined to form one single transformation. This time I believe spark is smart enough to compute A exactly once. Haven’t found a proper way to run and examine yet but should be obvious in the web UI DAG. What's further I think spark can even reduce the two filter operations into one traversal on A to get B and C at the same time. Is this true?

Upvotes: 0

Views: 101

Answers (1)

ImDarrenG
ImDarrenG

Reputation: 2345

There's a lot to unpack here.

Transformation RDDs B and C both depend on A which might itself be a complex transformation. So will A be computed twice ? I argue that it will because spark can't do anything that's inter-transformations, right ?

Yes, it will be computed twice, unless you call A.cache() or A.persist(), in which case it will be calculated only once.

For example it's possible that some state change occurs after B.someAction but before C.someAction which may affect the value of A so the re-computation becomes necessary

No, this is not correct, A is immutable, therefore it's state cannot change. B and C are also immutable RDDs that represent transformations of A.

sc.parallelize(...).flatMap(e => arr.map(_ * e)) //now A depends on some local array

No, it doesn't depend on the local array, it is an immutable RDD containing the copy of the elements of the (driver) local array. If the array changes, A does not change. To obtain that behaviour you would have to var A = sc. parallelize(...) and then set A again when local array changes A = sc.paralellize(...). In that scenario, A isn't 'updated' it is replaced by a new RDD representation of the local array, and as such any cached version of A is invalid.

The subsequent examples you have posted benefit from caching A. Again because RDDs are immutable.

Upvotes: 1

Related Questions