Leonard
Leonard

Reputation: 561

How to parallelize several apache spark rdds?

I have the next code:

sc.parquetFile("some large parquet file with bc").registerTempTable("bcs")
sc.parquetFile("some large parquet file with imps").registerTempTable("imps")
val bcs = sc.sql("select * from bcs")
val imps = sc.sql("select * from imps")

I want to do:

bcs.map(x => wrapBC(x)).collect
imps.map(x => wrapIMP(x)).collect

but when I do this, it's running not async. I can to do it with Future, like that:

val bcsFuture = Future { bcs.map(x => wrapBC(x)).collect }
val impsFuture = Future { imps.map(x => wrapIMP(x)).collect }
val result = for {
  bcs <- bcsFuture
  imps <- impsFuture
} yield (bcs, imps)
Await.result(result, Duration.Inf) //this return (Array[Bc], Array[Imp])

I want to do this without Future, how can I do it?

Upvotes: 1

Views: 3114

Answers (3)

Leonard
Leonard

Reputation: 561

You can use union for solve this problem. For example:

bcs.map(x => wrapBC(x).asInstanceOf[Any])
imps.map(x => wrapIMP(x).asInstanceOf[Any])

val result = (bcs union imps).collect()
val bcsResult = result collect { case bc: Bc => bc }
val impsResult = result collect { case imp: Imp => imp }

If you want to use sortBy or another operations, you can use inheritance of trait or main class.

Upvotes: 0

stholzm
stholzm

Reputation: 3455

Update: I misunderstood the question. The desired result is not the cartesian product Array[(Bc, Imp)].

But I'd argue that it does not matter how long the single map calls take because as soon as you add other transformations, Spark tries to combine them in an efficient way. As long as you only chain transformations on RDDs, nothing happens on the data. When you finally apply an action then the execution engine will figure out a way to produce the requested data.

So my advice would be to not think so much about the intermediate steps and avoid collect as much as possible because it will fetch all the data to the driver program.


It seems you are building a cartesian product yourself. Try cartesian instead:

val bc = bcs.map(x => wrapBC(x))
val imp = imps.map(x => wrapIMP(x))
val result = bc.cartesian(imp).collect

Note that collect is called on the final RDD and no longer on intermediate results.

Upvotes: 1

hayden.sikh
hayden.sikh

Reputation: 778

Update This was originally composed before the question was updated. Given those updates, I agree with @stholzm's answer to use cartesian in this case.


There do exist a limited number of actions which will produce a FutureAction[A] for an RDD[A] and be executed in the background. These are available on the AsyncRDDActions class, and so long as you import SparkContext._ any RDD will can be implicitly converted to an AysnchRDDAction as needed. For your specific code example that would be:

bcs.map(x => wrapBC(x)).collectAsync
imps.map(x => wrapIMP(x)).collectAsync

In additionally to evaluating the DAG up to action in the background, the FutureAction produced has the cancel method to attempt to end processing early.

Caveat

This may not do what you think it does. If the intent is to get data from both sources and then combine them you're more likely to want to join or group the RDDs instead. For that you can look at the functions available in PairRDDFunctions, again available on RDDs through implicit conversion.

If the intention isn't to have the data graphs interact then so far in my experience then running batches concurrently might only serve to slow down both, though that may be a consequence of how the cluster is configured. If the resource manager is set up to give each execution stage a monopoly on the cluster in FIFO order (the default in standalone and YARN modes, I believe; I'm not sure about Mesos) then each of the asynchronous collects will contend with each other for that monopoly, run their tasks, then contend again for the next execution stage.

Compare this to using a Future to wrap blocking calls to downstream services or database, for example, where either the resources in question are completely separate or generally have enough resource capacity to handle multiple requests in parallel without contention.

Upvotes: 2

Related Questions