Reputation: 331
I want to use data.groupby.apply()
to apply a function to each row of my Pyspark Dataframe per group.
I used The Grouped Map Pandas UDFs. However I can't figure out how to add another argument to my function.
I tried using the argument as a global variable but the function doesn't recognize it (my argument is a pyspark dataframe)
I also tried the solutions proposed in this question (for pandas dataframe) Use Pandas groupby() + apply() with arguments
@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def function(key,data, interval):
interval_df=interval.filter(interval["var"]==key).toPandas()
for value in interval_df:
#Apply some operations
return Data.groupBy("msn").apply(calc_diff, ('arg1'))
Or
@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def function(key,data, interval):
interval_df=interval.filter(interval["var"]==key).toPandas()
for value in interval_df:
#Apply some operations
return Data.groupBy("msn").apply(lambda x: calc_diff(x,'arg1'))
But I get the error :
ValueError: Invalid function: pandas_udfs with function type GROUPED_MAP must take either one argument (data) or two arguments (key, data).
Could anyone help me with the above issue.
Thanks
Upvotes: 23
Views: 20361
Reputation: 3564
All the answers seemed useful, but without a formal description of what is happening. So, I took bits and pieces from all and specifically from @sifta and tried to explain it as. Maybe it may help someone in the future.
Let's say I have a PySpark DF as follows
# test = pd.DataFrame({
# 'c1': ['a', 'a', 'b', 'b', 'b'],
# 'c2': ['a1', 'a2', 'b1', 'b1', 'b2']})
# test = spark.createDataFrame(test)
+---+---+
| c1| c2|
+---+---+
| a| a1|
| a| a2|
| b| b1|
| b| b1|
| b| b2|
+---+---+
My aim is to create another column c3
which can be group count + some fixed value
. Okay, definitely this is not the best example, but let's try to solve it using groupby. We need to pass the argument (the fixed value) which is kind of not supported directly.
So, following the answers, we can come up with
schema = t.StructType([
t.StructField('c1', t.StringType()),
t.StructField('c2', t.StringType()),
t.StructField('c3', t.IntegerType()),
])
def fn_wrapper(df, val):
@f.pandas_udf(schema, f.PandasUDFType.GROUPED_MAP)
def fn(pdf):
pdf['c3'] = pdf.shape[0] + val
return pdf
return df.groupby('c1', 'c2').apply(fn)
fn_wrapper(test, 7).show()
but what exactly does it mean?
We have the schema which is used to map the Pandas DF returned by fn (return pdf
).
So, let's understand how the trick works. We have defined a normal UDF called fn_wrapper
that takes the Pyspark DF and the argument to be used in the core pandas groupby. We call it in fn_wrapper(test, 7).show()
. Now, when we are inside the fn_wrapper
, we just have a function body inside it will just be compiled at the time being and not executed.
Next, the statement return df.groupby('c1', 'c2').apply(fn)
is executed. See, we have the function fn
defined as a Pandas UDF and it does not have any arguments. However, we are still inside the scope of fn_wrapper
when fn is called as apply
and it has val
defined. So, we can easily refer to val in pdf['c3'] = pdf.shape[0] + val
where we have the grouped data present as a Pandas DataFrame.
I guess this helps to understand the wrapping of the functions as a function inside a function and how to Pandas UDF with arguments.
Upvotes: 3
Reputation: 321
I like @hwrd's idea, but instead, would make it a generator pattern to make it easier to integrate like in @Feng's example:
def function_generator(key):
@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def function(interval):
interval_df=interval.filter(interval["var"]==key).toPandas()
for value in interval_df:
#Apply some operations
return function
calc_diff = function_generator('arg1')
output = Data.groupBy("msn").apply(calc_diff)
Upvotes: 22
Reputation: 431
You can create the pandas udf inside your function, so that the function arguments are known to it a the time of its creation. (Or you can import functools and use partial function evaluation to do the same thing.) Here is the example from the PySpark documentation, modified to pass in some parameters:
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def my_function(df, by="id", column="v", value=1.0):
schema = "{} long, {} double".format(by, column)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def subtract_value(pdf):
# pdf is a pandas.DataFrame
v = pdf[column]
g = pdf[by]
return pdf.assign(v = v - g * value)
return df.groupby(by).apply(subtract_value)
my_function(df, by="id", column="v", value=2.0).show()
Upvotes: 11
Reputation: 111
I think you could do something like this
def myfun(data, key, interval):
#Apply some operations
return something
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def myfun_udf(data):
return myfun(data=data, key=mykey, interval=myinterval)
mykey=1
myinterval=2
Data.groupBy("msn").apply(myfun_udf)
Upvotes: 2