mandy
mandy

Reputation: 303

Creating a custom Spark RDD in Python

Is it possible to extend Spark's RDDs in Python to add custom operators? If it's not possible, how can one wrap Scala code for a class that extends an RDD, such as the one here: http://blog.madhukaraphatak.com/extending-spark-api/

Edit: I am trying to create a new RDD, say PersonRDD and add a set of new operators on the PersonRDD, ex. PersonRDD.computeMedianIncome(). According to the link below, it is not trivial to do that in Python. However, since it's an old thread, I was wondering whether there were any new updates on that. If not, I would like to use Scala to do it, but I am not sure how to call the class from Python using Py4J ( mail-archives.us.apache.org/mod_mbox/spark-user/201308.mbox/…)

Any advice or help would be greatly appreciated.

Mandy

Upvotes: 5

Views: 3717

Answers (2)

Jan van der Vegt
Jan van der Vegt

Reputation: 1511

I had a similar issue and while I haven't tested the full functionality of normal RDDs on my extended version so far it is working as expected. It does require some work and I'm not sure if this is the best solution but what I'm doing is just extending the RDD class, reimplement the methods that return a new RDD by passing them in a constructor of the new Class and add methods to the class. Here is a short part of the code:

from pyspark.rdd import RDD, PipelinedRDD

class CustomRDD(RDD):
    def __init__(self, rdd, first=True):
        if first:
            rdd = custom_parser(rdd)
        self._jrdd = rdd._jrdd
        self.is_cached = rdd.is_cached
        self.is_checkpointed = rdd.is_checkpointed
        self.ctx = rdd.ctx
        self._jrdd_deserializer = rdd._jrdd_deserializer
        self._id = rdd._id
        self.partitioner = rdd.partitioner

    def mapPartitionsWithIndex(self, f, preservesPartition=False):
        return CustomRDD(PipelinedRDD(self, f, preservesPartition), False)

    def union(self, other):
        return WebtrendsRDD(super(WebtrendsRDD, self).union(other), False)

    def custom_method(self):
        return CustomRDD(self.filter(lambda x: x.has_property()), False)

The mapPartitionsWithIndex method is called by a lot of other RDD functionality so that covers a lot, but there are a lot of other methods that you have to wrap with your own constructor to keep getting back your own CustomRDD like I did with union.

Upvotes: 0

zero323
zero323

Reputation: 330173

Computing exact median in a distributed environment takes some effort so lets say you want something like square all values in a RDD. Let's call this method squares and assume it should work as follows:

assert rdd.squares().collect() == rdd.map(lambda x: x * x).collect()

1. Modify pyspark.RDD definition:

from pyspark import RDD

def squares(self):
    return self.map(lambda x: x * x)

RDD.squares = squares
rdd = sc.parallelize([1, 2, 3])
assert rdd.squares().collect() == [1, 4, 9]

Note: If you modify class definition every instance will get access to the squares.

2. Create RDD subclass:

class RDDWithSquares(RDD):
    def squares(self):
        return self.map(lambda x: x * x)

rdd = sc.parallelize([1, 2, 3])
rdd.__class__ = RDDWithSquares # WARNING: see a comment below

Assigning a class is a dirty hack so in practice you should create a RDD in a proper way (see for example context.parallelize implementation).

3. Add method to an instance

import types

rdd = sc.parallelize([1, 2, 3])
# Reusing squares function defined above
rdd.squares = types.MethodType(squares, rdd)

Disclaimer

First of all I haven't tested any of these long enough to be sure there no hidden problems there.

Moreover I don't think it is really worth all the fuss. Without static type checking it is really hard to find any benefits and you can obtain a similar result using functions, currying, and pipes in a much cleaner way.

from toolz import pipe
pipe(
    sc.parallelize([1, 2, 3]),
    squares,
    lambda rdd: rdd.collect())

Upvotes: 4

Related Questions