Reputation:
In the currently early-release textbook titled High Performance Spark, the developers of Spark note that:
To allow Spark the flexibility to spill some records to disk, it is important to represent your functions inside of
mapPartitions
in such a way that your functions don’t force loading the entire partition in-memory (e.g. implicitly converting to a list). Iterators have many methods we can write functional style transformations on, or you can construct your own custom iterator. When a transformation directly takes and returns an iterator without forcing it through another collection, we call these iterator-to-iterator transformations.
However, the textbook lacks good examples using mapPartitions
or similar variations of the method. And there's few good code examples existing online--most of which are Scala. For example, we see this Scala code using mapPartitions
written by zero323 on How to add columns into org.apache.spark.sql.Row inside of mapPartitions.
def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show
Unfortunately, Java doesn't provide anything as nice as iter.map(...)
for iterators. So it begs the question, how can one effectively use the iterator-to-iterator transformations with mapPartitions
without entirely spilling an RDD
to disk as a list?
JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
ArrayList<OutObj> out = new ArrayList<>();
while(iter.hasNext()) {
InObj current = iter.next();
out.add(someChange(current));
}
return out.iterator();
});
This seems to be the general syntax for using mapPartitions
in Java examples, but I don't see how this would be the most efficient, supposing you have a JavaRDD
with tens of thousands of records (or even more...since, Spark is for big data). You'd eventually end up with a list of all the objects in the iterator, just to turn it back into an iterator (which begs to say that a map function of some sort would be much more efficient here).
Note: while these 8 lines of code using mapPartitions
could be written as 1 line with a map
or flatMap
, I'm intentionally using mapPartitions
to take advantage of the fact that it operates over each partition rather than each element in the RDD
.
Any ideas, please?
Upvotes: 14
Views: 15696
Reputation: 37832
One way to prevent forcing the "materialization" of the entire partition is by converting the Iterator
into a Stream, and then using Stream
's functional API (e.g. map
function).
How to convert an iterator to a stream? suggests a few good ways to convert an Iterator
into a Stream
, so taking one of the options suggested there we can end up with:
rdd.mapPartitions((Iterator<InObj> iter) -> {
Iterable<InObj> iterable = () -> iter;
return StreamSupport.stream(iterable.spliterator(), false)
.map(s -> transformRow(s)) // or whatever transformation
.iterator();
});
Which should be an "Itrator-to-Iterator" transformation, because all the intermediate APIs used (Iterable
, Stream
) are lazily evaluated.
EDIT: I haven't tested it myself, but the OP commented, and I quote, that "there is no efficiency increase by using a Stream over a list". I don't know why that is, and I don't know if that would be true in general, but worth mentioning.
Upvotes: 12