Amol
Amol

Reputation: 386

How to do parallel processing in pyspark

I want to do parallel processing in for loop using pyspark.

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('myAppName').getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

data = [a,b,c]


for i in data:
    try:
        df = spark.read.parquet('gs://'+i+'-data')
        df.createOrReplaceTempView("people")
        df2=spark.sql("""select * from people """)
        df.show()
    except Exception as e:
        print(e)
        continue

Above mentioned script is working fine but i want to do parallel processing in pyspark and which is possible in scala

Upvotes: 4

Views: 28437

Answers (2)

vaudt
vaudt

Reputation: 181

Here's a parallel loop on pyspark using azure databricks.

import socket

def getsock(i):
  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  s.connect(("8.8.8.8", 80))
  return s.getsockname()[0]

rdd1 = sc.parallelize(list(range(10)))
parallel=rdd1.map(getsock).collect()

On other platforms than azure you'll maybe need to create the spark context sc. On azure the variable exists by default.

Coding it up like this only makes sense if in the code that is executed parallelly (getsock here) there is no code that is already parallel. For instance, had getsock contained code to go through a pyspark DataFrame then that code is already parallel. So, it would probably not make sense to also "parallelize" that loop.

Upvotes: 1

Andy_101
Andy_101

Reputation: 1306

Spark itself runs job parallel but if you still want parallel execution in the code you can use simple python code for parallel processing to do it (this was tested on DataBricks Only link).

data = ["a","b","c"]

from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)


def fun(x):
    try:
        df = sqlContext.createDataFrame([(1,2, x), (2,5, "b"), (5,6, "c"), (8,19, "d")], ("st","end", "ani"))
        df.show()
    except Exception as e:
        print(e)

pool.map( fun,data)

I have changed your code a bit but this is basically how you can run parallel tasks, If you have some flat files that you want to run parallel just make a list with their name and pass it into pool.map( fun,data).

Change the function fun as need be.

For more details on the multiprocessing module check the documentation.

Similarly, if you want to do it in Scala you will need the following modules

import scala.concurrent.{Future, Await}

For a more detailed understanding check this out. The code is for Databricks but with a few changes, it will work with your environment.

Upvotes: 7

Related Questions