Dave
Dave

Reputation: 2049

Pyspark SQL Pandas Grouped Map without GroupBy?

I have a dataset that I want to map over using several Pyspark SQL Grouped Map UDFs, at different stages of a larger ETL process that runs on ephemeral clusters in AWS EMR. The Grouped Map API requires that the Pyspark dataframe be grouped prior to the apply, but I have no need to actually group keys.

At the moment, I'm using an arbitrary grouping, which works, but results in:

  1. An unnecessary shuffle.

  2. Hacky code for an arbitrary groupby in each job.

My ideal solution allows a vectorized Pandas UDF apply without an arbitrary grouping, but if I could save the arbitrary grouping that would at least eliminate the shuffles.

EDIT:

Here's what my code looks like. I was originally using an arbitrary grouping, but am currently trying spark_partition_id() based on a comment below by @pault.


@pandas_udf(b_schema, PandasUDFType.GROUPED_MAP)
def transform(a_partition):
  b = a_partition.drop("pid", axis=1)
  # Some other transform stuff
  return b

(sql
  .read.parquet(a_path)
  .withColumn("pid", spark_partition_id())
  .groupBy("pid")
  .apply(transform)
  .write.parquet(b_path))

Using spark_partition_id() seems to still result in a shuffle. I get the following DAG:

Stage 1

  1. Scan parquet
  2. Project
  3. Project
  4. Exchange

Stage 2

  1. Exchange
  2. Sort
  3. FlatMapGroupsInPandas

Upvotes: 12

Views: 2143

Answers (1)

10465355
10465355

Reputation: 4631

To support roughly equivalent logic (functions (pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame) you'll have to switch to Spark 3.0.0 and use MAP_ITER transformation.

In the latest preview version (3.0.0-preview2) you'll need an UDF:

@pandas_udf(b_schema, PandasUDFType.MAP_ITER)
def transform(dfs):
    for df in dfs:
        b = df.drop("pid", axis=1)
        ...
        yield b

df.mapInPandas(transform)

and in the upcoming 3.0.0 release (SPARK-28264) just a plain function:

def transform(dfs):
    for df in dfs:
        b = df.drop("pid", axis=1)
        # Some other transform stuff
        ...
        yield b

df.mapInPandas(transform, b_schema)

A possible workaround on 2.x would be to use plain SCALAR UDF, serialize each row of the result as JSON, and deserialize it on the other side, i.e.

import json
from pyspark.sql.functions import from_json

@pandas_udf("string", PandasUDFType.SCALAR)
def transform(col1, col2):
    b = pd.DataFrame({"x": col1, "y": col2})
    ...
    return b.apply(lambda x: json.dumps(dict(zip(df.columns, x))), axis=1)


(df
    .withColumn("json_result", transform("col1", "col2"))
    .withColumn("a_struct", from_json("json_result", b_schema)))

Upvotes: 10

Related Questions