Reputation: 101
I am trying to call python functions which use pyspark rdd objects methods and are time-consuming which blocks my application. I need to write it in an async fashion so that my app doesn't get blocked. Here is a miniature version of the actual thing I want to do.
from concurrent.futures import Future
from pyspark import SparkContext
sc = SparkContext()
def add(a, b):
f = Future()
c = a + b
d = a*b
t = (c,d)
rdd = sc.parallelize([t])
f.set_result(rdd)
# return rdd
if __name__ == '__main__':
f1 = add(90,8)
f2 = add(8, 89)
while (not f1.done()) and (not f2.done()):
pass
print(f1.result())
print(f2.result())
I know the above code won't work straight away. How can I modify it, so that it will work ?
Upvotes: 7
Views: 7951
Reputation: 1508
Use the Threading module. I just finished a similar project and it worked like a charm.
import threading
new_thread = threading.Thread(
target = <function here>,
args = (<function args>),
name = <thread name>,
)
new_thread.start()
Above is the core functionality. Below is a more elaborate example of queuing up a job, adding that job (thread) to a line to wait its turn (with the thread.join() method), and returning a response with the number of jobs (threads) in line for processing.
current_jobs = []
for t in threading.enumerate():
if t._Thread__name in (<thread name>, <thread name>):
if t.is_alive():
current_jobs.append(t)
new_thread = threading.Thread(
target = <function here>,
args = (<function args here>, current_jobs),
name = <thread name here>,
)
new_thread.start()
# Create message.
job_id = uuid.uuid4().hex
job_message = "Job Id: " + job_id
# Check current job count.
job_count = len(current_jobs)
if job_count > 0:
# extend message if any jobs found.
job_message += "\nThere are " + str(job_count) + " ahead of you, so please be patient."
return app.make_response(job_message), 200
And pass current_jobs as an argument to your function, with this code executing at the beginning:
for j in current_jobs:
j.join()
Important to note that your function should contain the entire logic of creating your SparkContext, the lazy work on your RDDs/dataframes, and any actual work you want your cluster to do (.collect(), submitting to a db, etc.), as this thread is asynchronous and will have its own scope that would be complicated to transfer back to the main thread.
Upvotes: 3
Reputation: 15258
I think you should just return f in your function :
def add(a, b):
f = Future()
c = a + b
d = a*b
t = (c,d)
rdd = sc.parallelize([t])
f.set_result(rdd)
return f
But dont forget your rdd is lazy. With no action, it should not consume that much time.
Upvotes: 2