Reputation: 386
I want to do parallel processing in for loop using pyspark.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('myAppName').getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
data = [a,b,c]
for i in data:
try:
df = spark.read.parquet('gs://'+i+'-data')
df.createOrReplaceTempView("people")
df2=spark.sql("""select * from people """)
df.show()
except Exception as e:
print(e)
continue
Above mentioned script is working fine but i want to do parallel processing in pyspark and which is possible in scala
Upvotes: 4
Views: 28437
Reputation: 181
Here's a parallel loop on pyspark using azure databricks.
import socket
def getsock(i):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
rdd1 = sc.parallelize(list(range(10)))
parallel=rdd1.map(getsock).collect()
On other platforms than azure you'll maybe need to create the spark context sc
. On azure the variable exists by default.
Coding it up like this only makes sense if in the code that is executed parallelly (getsock
here) there is no code that is already parallel. For instance, had getsock
contained code to go through a pyspark DataFrame then that code is already parallel. So, it would probably not make sense to also "parallelize" that loop.
Upvotes: 1
Reputation: 1306
Spark itself runs job parallel but if you still want parallel execution in the code you can use simple python code for parallel processing to do it (this was tested on DataBricks Only link).
data = ["a","b","c"]
from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)
def fun(x):
try:
df = sqlContext.createDataFrame([(1,2, x), (2,5, "b"), (5,6, "c"), (8,19, "d")], ("st","end", "ani"))
df.show()
except Exception as e:
print(e)
pool.map( fun,data)
I have changed your code a bit but this is basically how you can run parallel tasks, If you have some flat files that you want to run parallel just make a list with their name and pass it into pool.map( fun,data).
Change the function fun as need be.
For more details on the multiprocessing module check the documentation.
Similarly, if you want to do it in Scala you will need the following modules
import scala.concurrent.{Future, Await}
For a more detailed understanding check this out. The code is for Databricks but with a few changes, it will work with your environment.
Upvotes: 7