Reputation: 19308
This question talks about how to chain custom PySpark 2 transformations.
The DataFrame#transform method was added to the PySpark 3 API.
This code snippet shows a custom transformation that doesn't take arguments and is working as expected and another custom transformation that takes arguments and is not working.
from pyspark.sql.functions import col, lit
df = spark.createDataFrame([(1, 1.0), (2, 2.)], ["int", "float"])
def with_funny(word):
def inner(df):
return df.withColumn("funny", lit(word))
return inner
def cast_all_to_int(input_df):
return input_df.select([col(col_name).cast("int") for col_name in input_df.columns])
df.transform(with_funny("bumfuzzle")).transform(cast_all_to_int).show()
Here's what's outputted:
+---+-----+-----+
|int|float|funny|
+---+-----+-----+
| 1| 1| null|
| 2| 2| null|
+---+-----+-----+
How should the with_funny()
method be defined to output a value for the PySpark 3 API?
Upvotes: 6
Views: 4195
Reputation: 2718
It has been solved in pyspark 3.3.0
def transform(self, func: Callable[..., "DataFrame"], *args: Any, **kwargs: Any) -> "DataFrame":
"""Returns a new :class:`DataFrame`. Concise syntax for chaining custom transformations.
.. versionadded:: 3.0.0
Parameters
----------
func : function
a function that takes and returns a :class:`DataFrame`.
*args
Positional arguments to pass to func.
.. versionadded:: 3.3.0
**kwargs
Keyword arguments to pass to func.
.. versionadded:: 3.3.0
Examples
--------
>>> from pyspark.sql.functions import col
>>> df = spark.createDataFrame([(1, 1.0), (2, 2.0)], ["int", "float"])
>>> def cast_all_to_int(input_df):
... return input_df.select([col(col_name).cast("int") for col_name in input_df.columns])
>>> def sort_columns_asc(input_df):
... return input_df.select(*sorted(input_df.columns))
>>> df.transform(cast_all_to_int).transform(sort_columns_asc).show()
+-----+---+
|float|int|
+-----+---+
| 1| 1|
| 2| 2|
+-----+---+
>>> def add_n(input_df, n):
... return input_df.select([(col(col_name) + n).alias(col_name)
... for col_name in input_df.columns])
>>> df.transform(add_n, 1).transform(add_n, n=10).show()
+---+-----+
|int|float|
+---+-----+
| 12| 12.0|
| 13| 13.0|
+---+-----+
"""
result = func(self, *args, **kwargs)
assert isinstance(
result, DataFrame
), "Func returned an instance of type [%s], " "should have been DataFrame." % type(result)
return result
Upvotes: 2
Reputation: 118
If I understood, your first transform method will add a new column with a literal that is passed as an argument and the last transform casts all the columns to int type, correct?
casting a string to int will return a null value, your final output is correct:
from pyspark.sql.functions import col, lit
df = spark.createDataFrame([(1, 1.0), (2, 2.)], ["int", "float"])
def with_funny(word):
def inner(df):
return df.withColumn("funny", lit(word))
return inner
def cast_all_to_int(input_df):
return input_df.select([col(col_name).cast("int") for col_name in input_df.columns])
#first transform
df1 = df.transform(with_funny("bumfuzzle"))
df1.show()
#second transform
df2 = df1.transform(cast_all_to_int)
df2.show()
#all together
df_final = df.transform(with_funny("bumfuzzle")).transform(cast_all_to_int)
df_final.show()
Output:
+---+-----+---------+
|int|float| funny|
+---+-----+---------+
| 1| 1.0|bumfuzzle|
| 2| 2.0|bumfuzzle|
+---+-----+---------+
+---+-----+-----+
|int|float|funny|
+---+-----+-----+
| 1| 1| null|
| 2| 2| null|
+---+-----+-----+
+---+-----+-----+
|int|float|funny|
+---+-----+-----+
| 1| 1| null|
| 2| 2| null|
+---+-----+-----+
Maybe what you want is switch the order of your transformations like this:
df_final = df.transform(cast_all_to_int).transform(with_funny("bumfuzzle"))
df_final.show()
Output:
+---+-----+---------+
|int|float| funny|
+---+-----+---------+
| 1| 1|bumfuzzle|
| 2| 2|bumfuzzle|
+---+-----+---------+
Upvotes: 5