Ben
Ben

Reputation: 305

Multiple Python workers (or worker threads) in PySpark?

In PySpark, I understand that python workers are used to perform (atleast some) of the computation on the worker nodes (as described at https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals).

In my test setup, I'm trying to get Spark to use 4 worker threads (on a standalone machine), but it seems like only 1 python worker is created:

import socket
import threading

spark = SparkSession\
    .builder\
    .master('local[4]')\
    .appName("PythonPi")\
    .getOrCreate()

partitions = 4

# Print the ident of the local thread:
print(str(threading.get_ident()))

# Print the idents of the threads inside the python workers:
thread_ids = spark.sparkContext.parallelize(range(1, partitions + 1), partitions)\
.map(lambda x: ' threadid: ' + str(threading.get_ident())).collect()


print(thread_ids)

spark.stop()

Output:

140226126948096
[' threadid: 139948131018496', ' threadid: 139948131018496', ' threadid: 139948131018496', ' threadid: 139948131018496']

Looking at these thread IDs, it would seem that the same python thread (in the same worker) was used to process all partitions? Or is that code being evaluated outside the python workers?

Is there some other way to access an ID for the python workers - so I can understand where the code is running?

Upvotes: 6

Views: 3345

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35229

Your mistake is to believe that PySpark uses threading. It does not. It uses processes and thread ids in general, are unique only within a process (and can be reused).

So your code should be:

import os

(spark.sparkContext.range(partitions)
    .map(lambda x: 'pid: {}'.format(os.getpid()))
    .collect())

# ['pid: 749', 'pid: 755', 'pid: 753', 'pid: 758']

Upvotes: 2

Related Questions