Reputation: 487
I'm trying to transform spark dataframe with 10k rows by latest spark 3.0.1 function mapInPandas.
Expected output: mapped pandas_function() transforms one row to three, so output transformed_df should have 30k rows
Current output: I'm getting 3 rows with 1 core and 24 rows with 8 cores.
INPUT: respond_sdf has 10k rows
|url |content |
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
only showing top 20 rows
Input respond_sdf has 10000 rows
OUTPUT A) 3 rows - with 1 core - .master('local [1]')
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } (0 + 1) / 1]
| api| A| B|
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
Output transformed_df has 3 rows
OUTPUT B) 24 rows - with 8 cores - .master('local[8]')
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } (0 + 1) / 1]
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
| api| A| B|
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
|api_1| 1| 4|
|api_1| 2| 5|
only showing top 20 rows
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } (3 + 5) / 8]
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
Output transformed_df has 24 rows
Example Code:
import pandas as pd
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
spark = pyspark.sql.SparkSession.builder.appName("test") \
.master('local[1]') \
sc = spark.sparkContext
####### INPUT DATAFRAME WITH LIST OF JSONS ########################
# Create list with 10k nested tuples(url,content)
rdd_list = [('api_1',"{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }"),
(' api_2', "{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }")]*5000
schema = StructType([
StructField('url', StringType(), True),
StructField('content', StringType(), True)
#Create input dataframe with 10k rows
jsons = sc.parallelize(rdd_list)
respond_sdf = spark.createDataFrame(jsons, schema)
print(f'Input respond_sdf has {respond_sdf.count()} rows')
####### TRANSFORMATION DATAFRAME ########################
# Pandas transformation function returning pandas dataframe
def pandas_function(iter):
for df in iter:
yield pd.DataFrame(eval(df['content'][0]))
transformed_df = respond_sdf.mapInPandas(pandas_function, "api string, A int, B int")
print(f' Output transformed_df has {transformed_df.count()} rows')
print(f'Expected output dataframe should has 30k rows')
Link to related discussion: How to yield pandas dataframe rows to spark dataframe
Upvotes: 4
Views: 5528
Reputation: 61
Actually there is a tool that enables you to stop inside UDF and debug in VSCode, check out pyspark_xray library, its demo app demonstrates how to use pyspark_xray's wrapper_sdf_mapinpandas
function to step into Pandas UDF that are passed into mapInPandas
Upvotes: 2
Reputation: 42422
Sorry that in my answer to your previous question, the part that uses mapInPandas
was incorrect. I think this function below is the correct way to write the pandas function. I made a mistake last time because I previously thought iter
was an iterable of rows, but it's actually an iterable of dataframes.
def pandas_function(iter):
for df in iter:
yield pd.concat(pd.DataFrame(x) for x in df['content'].map(eval))
(PS Thanks to answer from here.)
Upvotes: 4