ASHISH M.G
ASHISH M.G

Reputation: 802

Concurrency in Pyspark

I have a Pyspark program wherein I have a function that accepts a string as a parameter. The string itself is contained in an array of strings. So essentially, I'm looping through the string array and calling the function from within the loop. While doing so, I'm appending the String output Returned by the function to another String array.

Now, my function has series of if--else statements in it which checks the argument and chooses the loop to execute. These all if blocks are independent codes and only share a global cached dataframe and a global spark session.

I want the function call to be done concurrently rather than in a FIFO manner which is happening now. Which is better option in Python for this ?

It would be helpful if a example code can be provided !

My example pseudo code :

global spark_session
global cached_dataframe

x = ["A","B","C"]
y=[]

function test(z):
   if z=="A":
      -------SOME CODE FOR "A" -------
   elif z=="B":
      -------SOME CODE FOR "B" -------
   elif z=="C":
      -------SOME CODE FOR "C" -------

for i in x:
   y.append(test(i))

If concurrency is not possible here, can you suggest a better way to organise my code ? Like avoiding if else loops and so on . Because in my current requirement, this if else block is gonna go endless !

Upvotes: 2

Views: 7423

Answers (1)

danielcahall
danielcahall

Reputation: 2742

The first thought would be to change the scheduling configuration from 'FIFO' to 'FAIR':

spark.conf.set('spark.scheduler.mode', 'FAIR')

(assuming spark is your SparkSession object).

More about configuring the schedule is here: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application

I don't think multiprocessing would make sense here, as it's more about the scheduling of spark tasks (the compute heavy work is presumably being done by Spark). The other idea would be potentially using a queue with multiple threads:

def process_queue(queue, func, num_workers=None):
    if not num_workers:
        num_workers = 5

    def process_elements(queue):
        while True:
            try:
                item = queue.get(timeout=1)
                func(item)
                queue.task_done()
            except Empty:
                break

    threads = [Thread(target=process_elements, args=(queue,)) for _ in range(num_workers)]
    for t in threads:
        t.start()
    queue.join()
    for t in threads:
        t.join()
for i in x:
   queue.put(i)
process_queue(queue, test)

You could also probably do something with the ThreadPoolExecutor in the concurrent.futures module (https://docs.python.org/3/library/concurrent.futures.html):

with ThreadPoolExecutor(5) as pool:
    pool.map(test, x) # maps the test function to all elements in x

Or even:

with ThreadPoolExecutor(5) as pool:
    [pool.submit(test, e) for e in x] 

and take advantage of the future objects which the executor returns. Since I'm not intimately familiar with the requirements of the application, I'm not sure how much that would benefit you, but I hope I've laid out a few potentially useful approaches using multithreading! I've personally used both approaches with spark applications and saw performance improvements.

Upvotes: 3

Related Questions