Joep Atol
Joep Atol

Reputation: 63

Spark: forcing each task on a seperate executor

Suppose we have a SparkDataFrame of 20 rows. I'm applying a pyspark UDF on each row that performs some expensive calculation.


def expensive_python_function(df, a, b) -> pd.DataFrame:
    return ...

def create_udf(a: Broadcast, b: Broadcast, func: Broadcast) -> Callable:
    def my_udf(df: pd.DataFrame) -> pd.DataFrame:
        result = func.value(df, a.value, b.value)
        result["timestamp"] = datetime.datetime.now()
        return result
    return my_udf

broadcast_func = sparkContext.broadcast(expensive_python_function)
broadcast_a = sparkContext.broadcast(a)
broadcast_b = sparkContext.broadcast(b)

result = sdf.groupby(*groups).applyInPandas(
      create_udf(broadcast_a, broadcast_b, broadcast_func), 
      schema=schema
)

result.show()

To clarify, each unique group in the groupby will result in a dataframe of one row.

The variables a and b are used by each executor and are the same for all of them. I am accessing the variables in my_udf using broadcast_a.value.

Problem

This operation results in 2 partitions and thus 2 tasks. Both tasks are executed on a single (the same) executor. Obviously that is not what I want, I would like to have each task run on a seperate executor in parrallel.

What I tried

I repartitioned the dataframe into 20 partitions and used persist the cache it in memory.


sdf = sdf.repartition(20).persist()


result = sdf.groupby(*groups).applyInPandas(
      create_udf(broadcast_a, broadcast_b, broadcast_func), 
      schema=schema
)

result.show()

This indeed gives me 20 partitions and 20 tasks to be completed. However, from the 10 executors only 1 is still active.

enter image description here

I tried:

I also noticed that each executor does contain rdd block, that puzzles me as well?

Question

It seems to me like the spark driver is deciding for me that all jobs can be run on one executor, which makes sense from a big data point of view. I realize that Spark is not exactly intended for my use-case, I'm testing if and what kind of speedup I can achieve as oppossed to using something like python multiprocessing.

Is it possible to force each task to be run on a seperate executor, regardless of the size of the data or the nature of the task?

I'm using Python 3.9 and Spark 3.2.1

Upvotes: 3

Views: 1197

Answers (1)

Joep Atol
Joep Atol

Reputation: 63

So, the solution lied in not using the DataFrame API. Working with RDD's seems to give you much more control.


params = [(1,2), (3,4), (5,6)]

@dataclass
class Task:
   func: Callable
   a: int
   b: int

def run_task(task: Task):
    return task.func(task.a, task.b)

data = spark.parallelize(
   [Task(expensive_python_function, a, b) for a, b in params],
   len(params)]
)

result = data.map(run_task)

It will return an RDD, so you need to convert to DataFrame. Or use collect() to collect to get the result.

To be sure I also set spark.default.parallelism = str(len(params)) and I set spark.executor.instances = str(len(params)). I believe the parallelism setting should not be necessary as you are basically passing that in spark.parallelize as well.

Hope it helps someone!

Upvotes: 1

Related Questions