Meethu Mathew
Meethu Mathew

Reputation: 431

How to run multiple jobs in one Sparkcontext from separate threads in PySpark?

It is understood from Spark documentation about Scheduling Within an Application:

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)."

I could found few example code of the same in Scala and Java. Can somebody give an example of how this can be implemented using PySpark?

Upvotes: 28

Views: 42750

Answers (3)

Zach
Zach

Reputation: 958

ThreadPool is convenient but it could cause unexpected behaviors. For example, if all the threads are writing dataframes to the same location using the overwrite mode, whether the threads "overwrite" each other's files depends on the timing. It's very much like "first come first serve".

Normally all threads get evaluated/materialized at the same time, so this location will end up with all files from all threads (as if they were all in the append mode). But if there are delays in some threads, they can certainly overwrite other threads' files.

Upvotes: -1

sparknoob
sparknoob

Reputation: 1286

I was running into the same issue, so I created a tiny self-contained example. I create multiple threads using python's threading module and submit multiple spark jobs simultaneously.

Note that by default, spark will run the jobs in First-In First-Out (FIFO): http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application. In the example below, I change it to FAIR scheduling

# Prereqs:
# set 
# spark.dynamicAllocation.enabled         true
# spark.shuffle.service.enabled           true
  spark.scheduler.mode                    FAIR
# in spark-defaults.conf

import threading
from pyspark import SparkContext, SparkConf

def task(sc, i):
  print sc.parallelize(range(i*10000)).count()

def run_multiple_jobs():
  conf = SparkConf().setMaster('local[*]').setAppName('appname')
  # Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
  conf.set('spark.scheduler.mode', 'FAIR')
  sc = SparkContext(conf=conf)
  for i in range(4):
    t = threading.Thread(target=task, args=(sc, i))
    t.start()
    print 'spark task', i, 'has started'


run_multiple_jobs()

Output:

spark task 0 has started
spark task 1 has started
spark task 2 has started
spark task 3 has started
30000
0 
10000
20000

Upvotes: 26

Harald Schilly
Harald Schilly

Reputation: 1108

Today, I was asking me the same. The multiprocessing module offers a ThreadPool, which is spawning a few threads for you and hence runs the jobs in parallel. First instantiate the functions, then create the Pool, and then map it over the range you want to iterate.

In my case, I was calculating these WSSSE numbers for different numbers of centers (hyperparameter tuning) to get a "good" k-means clustering ... just like it is outlined in the MLSpark documentation. Without further explanations, here are some cells from my IPython worksheet:

from pyspark.mllib.clustering import KMeans
import numpy as np

c_points are 12dim arrays:

>>> c_points.cache()
>>> c_points.take(3)
[array([ 1, -1,  0,  1,  0,  0,  0,  0,  0,  0,  0,  0]),
array([-2,  0,  0,  1,  0,  0,  0,  0,  0,  0,  0,  0]),
array([ 7, -1,  1,  0,  0,  0,  0,  0,  0,  0,  0,  0])]

In the following, for each i I'm computing this WSSSE value and returning it as a tuple:

def error(point, clusters):
    center = clusters.centers[clusters.predict(point)]
    return np.linalg.norm(point - center)

def calc_wssse(i):
    clusters = KMeans.train(c_points, i, maxIterations=20,
        runs=20, initializationMode="random")
    WSSSE = c_points\
        .map(lambda point: error(point, clusters))\
        .reduce(lambda x, y: x + y)
    return (i, WSSSE)

Here starts the interesting part:

from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=4)

Run it:

wssse_points = tpool.map(calc_wssse, range(1, 30))
wssse_points

gives:

[(1, 195318509740785.66),
 (2, 77539612257334.33),
 (3, 78254073754531.1),
 ...
]

Upvotes: 8

Related Questions