Reputation: 303
Is it possible to extend Spark's RDDs in Python to add custom operators? If it's not possible, how can one wrap Scala code for a class that extends an RDD, such as the one here: http://blog.madhukaraphatak.com/extending-spark-api/
Edit: I am trying to create a new RDD, say PersonRDD and add a set of new operators on the PersonRDD, ex. PersonRDD.computeMedianIncome(). According to the link below, it is not trivial to do that in Python. However, since it's an old thread, I was wondering whether there were any new updates on that. If not, I would like to use Scala to do it, but I am not sure how to call the class from Python using Py4J ( mail-archives.us.apache.org/mod_mbox/spark-user/201308.mbox/…)
Any advice or help would be greatly appreciated.
Mandy
Upvotes: 5
Views: 3717
Reputation: 1511
I had a similar issue and while I haven't tested the full functionality of normal RDDs on my extended version so far it is working as expected. It does require some work and I'm not sure if this is the best solution but what I'm doing is just extending the RDD class, reimplement the methods that return a new RDD by passing them in a constructor of the new Class and add methods to the class. Here is a short part of the code:
from pyspark.rdd import RDD, PipelinedRDD
class CustomRDD(RDD):
def __init__(self, rdd, first=True):
if first:
rdd = custom_parser(rdd)
self._jrdd = rdd._jrdd
self.is_cached = rdd.is_cached
self.is_checkpointed = rdd.is_checkpointed
self.ctx = rdd.ctx
self._jrdd_deserializer = rdd._jrdd_deserializer
self._id = rdd._id
self.partitioner = rdd.partitioner
def mapPartitionsWithIndex(self, f, preservesPartition=False):
return CustomRDD(PipelinedRDD(self, f, preservesPartition), False)
def union(self, other):
return WebtrendsRDD(super(WebtrendsRDD, self).union(other), False)
def custom_method(self):
return CustomRDD(self.filter(lambda x: x.has_property()), False)
The mapPartitionsWithIndex method is called by a lot of other RDD functionality so that covers a lot, but there are a lot of other methods that you have to wrap with your own constructor to keep getting back your own CustomRDD like I did with union.
Upvotes: 0
Reputation: 330173
Computing exact median in a distributed environment takes some effort so lets say you want something like square all values in a RDD. Let's call this method squares
and assume it should work as follows:
assert rdd.squares().collect() == rdd.map(lambda x: x * x).collect()
pyspark.RDD
definition:from pyspark import RDD
def squares(self):
return self.map(lambda x: x * x)
RDD.squares = squares
rdd = sc.parallelize([1, 2, 3])
assert rdd.squares().collect() == [1, 4, 9]
Note: If you modify class definition every instance will get access to the squares
.
class RDDWithSquares(RDD):
def squares(self):
return self.map(lambda x: x * x)
rdd = sc.parallelize([1, 2, 3])
rdd.__class__ = RDDWithSquares # WARNING: see a comment below
Assigning a class is a dirty hack so in practice you should create a RDD in a proper way (see for example context.parallelize implementation).
import types
rdd = sc.parallelize([1, 2, 3])
# Reusing squares function defined above
rdd.squares = types.MethodType(squares, rdd)
First of all I haven't tested any of these long enough to be sure there no hidden problems there.
Moreover I don't think it is really worth all the fuss. Without static type checking it is really hard to find any benefits and you can obtain a similar result using functions, currying, and pipes
in a much cleaner way.
from toolz import pipe
pipe(
sc.parallelize([1, 2, 3]),
squares,
lambda rdd: rdd.collect())
Upvotes: 4