Reputation: 239
I am trying to run 2 functions doing completely independent transformations on a single RDD in parallel using PySpark. What are some methods to do the same?
def doXTransforms(sampleRDD):
(X transforms)
def doYTransforms(sampleRDD):
(Y Transforms)
if __name__ == "__main__":
sc = SparkContext(appName="parallelTransforms")
sqlContext = SQLContext(sc)
hive_context = HiveContext(sc)
rows_rdd = hive_context.sql("select * from tables.X_table")
p1 = Process(target=doXTransforms , args=(rows_rdd,))
p1.start()
p2 = Process(target=doYTransforms, args=(rows_rdd,))
p2.start()
p1.join()
p2.join()
sc.stop()
This does not work and I now understand this will not work. But is there any alternative way to make this work? Specifically are there any python-spark specific solutions?
Upvotes: 19
Views: 17031
Reputation: 330393
Just use threads and make sure that cluster have enough resources to process both tasks at the same time.
from threading import Thread
import time
def process(rdd, f):
def delay(x):
time.sleep(1)
return f(x)
return rdd.map(delay).sum()
rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))
t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2 = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()
Arguably this is not that often useful in practice but otherwise should work just fine.
You can further use in-application scheduling with FAIR
scheduler and scheduler pools for a better control over execution strategy.
You can also try pyspark-asyncactions
(disclaimer - the author of this answer is also the author of the package) which provides a set of wrappers around Spark API and concurrent.futures
:
import asyncactions
import concurrent.futures
f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()
[x.result() for x in concurrent.futures.as_completed([f1, f2])]
Upvotes: 18