samthebest
samthebest

Reputation: 31553

How to transform RDD, Dataframe or Dataset straight to a Broadcast variable without collect?

Is there any way (or any plans) to be able to turn Spark distributed collections (RDDs, Dataframe or Datasets) directly into Broadcast variables without the need for a collect? The public API doesn't seem to have anything "out of box", but can something be done at a lower level?

I can imagine there is some 2x speedup potential (or more?) for these kind of operations. To explain what I mean in detail let's work through an example:

val myUberMap: Broadcast[Map[String, String]] =
  sc.broadcast(myStringPairRdd.collect().toMap)

someOtherRdd.map(someCodeUsingTheUberMap)

This causes all the data to be collected to the driver, then the data is broadcasted. This means the data is sent over the network essentially twice.

What would be nice is something like this:

val myUberMap: Broadcast[Map[String, String]] =
  myStringPairRdd.toBroadcast((a: Array[(String, String)]) => a.toMap)

someOtherRdd.map(someCodeUsingTheUberMap)

Here Spark could bypass collecting the data altogether and just move the data between the nodes.

BONUS

Furthermore, there could be a Monoid-like API (a bit like combineByKey) for situations where the .toMap or whatever operation on Array[T] is expensive, but can possibly be done in parallel. E.g. constructing certain Trie structures can be expensive, this kind of functionality could result in awesome scope for algorithm design. This CPU activity can also be run while the IO is running too - while the current broadcast mechanism is blocking (i.e. all IO, then all CPU, then all IO again).

CLARIFICATION

Joining is not (main) use case here, it can be assumed that I sparsely use the broadcasted data structure. For example the keys in someOtherRdd by no means covers the keys in myUberMap but I don't know which keys I need until I traverse someOtherRdd AND suppose I use myUberMap multiple times.

I know that all sounds a bit vague, but the point is for more general machine learning algorithm design.

Upvotes: 9

Views: 2642

Answers (2)

zero323
zero323

Reputation: 330413

While theoretically this is an interesting idea I will argue that although theoretically possible it has very limited practical applications. Obviously I cannot speak for PMC so I cannot say if there are any plans to implement this type of broadcasting mechanism at all.

Possible implementation:

Since Spark already provides torrent broadcasting mechanism which behavior is described as follows:

The driver divides the serialized object into small chunks and stores those chunks in the BlockManager of the driver.

On each executor, the executor first attempts to fetch the object from its BlockManager. If it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or other executors if available.

Once it gets the chunks, it puts the chunks in its own BlockManager, ready for other executors to fetch from.

it should be possible to reuse the same mechanism for direct node-to-node broadcasting.

It is worth noting that this approach cannot completely eliminate driver communication. Even though blocks could be created locally you still need a single source of truth to advertise a set of blocks to fetch.

Limited applications

One problem with broadcast variables is that there are quite expensive. Even if you can eliminate driver bottleneck two problems remain:

  • Memory required to store deserialized object on each executor.
  • Cost of transferring broadcasted data to every executor.

The first problem should be relatively obvious. It is not only about direct memory usage but also about GC cost and its effect on overall latency. The second one is rather subtle. I partially covered this in my answer to Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark but let's discus this further.

From network traffic perspective broadcasting a whole dataset is pretty much equivalent to creating Cartesian product. So if dataset is large enough for driver becoming a bottleneck it is unlikely to be a good candidate for broadcasting and targeted approach like hash join can be preferred in practice.

Alternatives:

There are some methods which can be used to achieve similar results as direct broadcast and address issues enumerated above including:

  • Passing data via distributed file system.
  • Using replicated database collocated with worker nodes.

Upvotes: 6

Andrei Stankevich
Andrei Stankevich

Reputation: 211

I don't know if we can do it for RDD but you can do it for Dataframe

import org.apache.spark.sql.functions

val df:DataFrame = your_data_frame

val broadcasted_df = functions.broadcast(df)

now you can use variable broadcasted_df and it will be broadcasted to executor.

Make sure broadcasted_df dataframe is not too big and can be send to executor.

broadcasted_df will be broadcaster in operations like for example

other_df.join(broadcasted_df)

and in this case join() operation executes faster because every executor has 1 partition of other_df and whole broadcasted_df

For your question i am not sure you can do what you want. You can not use one rdd inside #map() method of another rdd because spark doesn't allowed transformations inside transformations. And in your case you need to call collect() method to create map from your RDD because you can only use usual map object inside #map() method you can not use RDD there.

Upvotes: 0

Related Questions