user4728253
user4728253

Reputation:

Apache Spark: Effectively using mapPartitions in Java

In the currently early-release textbook titled High Performance Spark, the developers of Spark note that:

To allow Spark the flexibility to spill some records to disk, it is important to represent your functions inside of mapPartitions in such a way that your functions don’t force loading the entire partition in-memory (e.g. implicitly converting to a list). Iterators have many methods we can write functional style transformations on, or you can construct your own custom iterator. When a transformation directly takes and returns an iterator without forcing it through another collection, we call these iterator-to-iterator transformations.

However, the textbook lacks good examples using mapPartitions or similar variations of the method. And there's few good code examples existing online--most of which are Scala. For example, we see this Scala code using mapPartitions written by zero323 on How to add columns into org.apache.spark.sql.Row inside of mapPartitions.

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show

Unfortunately, Java doesn't provide anything as nice as iter.map(...) for iterators. So it begs the question, how can one effectively use the iterator-to-iterator transformations with mapPartitions without entirely spilling an RDD to disk as a list?

JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
    ArrayList<OutObj> out = new ArrayList<>();
    while(iter.hasNext()) {
        InObj current = iter.next();
        out.add(someChange(current));
    }
    return out.iterator();
});

This seems to be the general syntax for using mapPartitions in Java examples, but I don't see how this would be the most efficient, supposing you have a JavaRDD with tens of thousands of records (or even more...since, Spark is for big data). You'd eventually end up with a list of all the objects in the iterator, just to turn it back into an iterator (which begs to say that a map function of some sort would be much more efficient here).

Note: while these 8 lines of code using mapPartitions could be written as 1 line with a map or flatMap, I'm intentionally using mapPartitions to take advantage of the fact that it operates over each partition rather than each element in the RDD.

Any ideas, please?

Upvotes: 14

Views: 15696

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37832

One way to prevent forcing the "materialization" of the entire partition is by converting the Iterator into a Stream, and then using Stream's functional API (e.g. map function).

How to convert an iterator to a stream? suggests a few good ways to convert an Iterator into a Stream, so taking one of the options suggested there we can end up with:

rdd.mapPartitions((Iterator<InObj> iter) -> {
    Iterable<InObj> iterable = () -> iter;
    return StreamSupport.stream(iterable.spliterator(), false)
            .map(s -> transformRow(s)) // or whatever transformation
            .iterator();
});

Which should be an "Itrator-to-Iterator" transformation, because all the intermediate APIs used (Iterable, Stream) are lazily evaluated.

EDIT: I haven't tested it myself, but the OP commented, and I quote, that "there is no efficiency increase by using a Stream over a list". I don't know why that is, and I don't know if that would be true in general, but worth mentioning.

Upvotes: 12

Related Questions