Reputation: 716
Given a dataframe like:
A0 A1 A2 A3
0 9 1 2 8
1 9 7 6 9
2 1 7 4 6
3 0 8 4 8
4 0 1 6 0
5 7 1 4 3
6 6 3 5 9
7 3 3 2 8
8 6 3 0 8
9 3 2 7 1
I need to apply a function to a set of the columns row by row to create a new column with the results of this function.
An example in Pandas is:
df = pd.DataFrame(data=None, columns=['A0', 'A1', 'A2', 'A3'])
df['A0'] = np.random.randint(0, 10, 10)
df['A1'] = np.random.randint(0, 10, 10)
df['A2'] = np.random.randint(0, 10, 10)
df['A3'] = np.random.randint(0, 10, 10)
df['mean'] = df.mean(axis=1)
df['std'] = df.iloc[:, :-1].std(axis=1)
df['any'] = df.iloc[:, :-2].apply(lambda x: np.sum(x), axis=1)
And the results is:
A0 A1 A2 A3 mean std any
0 9 1 2 8 5.00 4.082483 20
1 9 7 6 9 7.75 1.500000 31
2 1 7 4 6 4.50 2.645751 18
3 0 8 4 8 5.00 3.829708 20
4 0 1 6 0 1.75 2.872281 7
5 7 1 4 3 3.75 2.500000 15
6 6 3 5 9 5.75 2.500000 23
7 3 3 2 8 4.00 2.708013 16
8 6 3 0 8 4.25 3.500000 17
9 3 2 7 1 3.25 2.629956 13
How can I do something similar in PySpark?
Upvotes: 0
Views: 2996
Reputation: 581
The above answer is great, however I see the OP is using Python/PySpark and if you don't understand Spark SQL expressions the above logic is not 100% clear.
I would suggest using a Pandas UDAF, unlike UDF's these are vectorized and very efficient. This has been added to the Spark API to lower the learning curve needed to migrate from pandas to Spark. This also means that your code is more maintainable if most of your colleagues, like mine, are more familiar with Pandas/Python.
These are the types of Pandas UDAF's available and their Pandas equivalent
E.g
SparkUdafType → df.pandasEquivalent(...) works on → returns
SCALAR → df.transform(...), Mapping Series → Series
GROUPED_MAP → df.apply(...) , Group & MapDataFrame → DataFrame
GROUPED_AGG → df.aggregate(...), Reduce Series → Scalar
Upvotes: 1
Reputation: 32640
For Spark 2.4+, you can use aggregate
function. First, create array columns values
using all the dataframe columns. Then, calculate std
, means
and any
columns like this:
any
: aggregate to sum the array elementsmean
: divide any
column by the size of the array values
std
: aggregate and sum (x - mean) ** 2
then divide by the length - 1
of the arrayHere is the associated code :
from pyspark.sql.functions import expr, sqrt, size, col, array
data = [
(9, 1, 2, 8), (9, 7, 6, 9), (1, 7, 4, 6),
(0, 8, 4, 8), (0, 1, 6, 0), (7, 1, 4, 3),
(6, 3, 5, 9), (3, 3, 2, 8), (6, 3, 0, 8),
(3, 2, 7, 1)
]
df = spark.createDataFrame(data, ['A0', 'A1', 'A2', 'A3'])
cols = df.columns
df.withColumn("values", array(*cols)) \
.withColumn("any", expr("aggregate(values, 0D, (acc, x) -> acc + x)")) \
.withColumn("mean", col("any") / size(col("values"))) \
.withColumn("std", sqrt(expr("""aggregate(values, 0D,
(acc, x) -> acc + power(x - mean, 2),
acc -> acc / (size(values) -1))"""
)
)) \
.drop("values") \
.show(truncate=False)
#+---+---+---+---+----+----+------------------+
#|A0 |A1 |A2 |A3 |any |mean|std |
#+---+---+---+---+----+----+------------------+
#|9 |1 |2 |8 |20.0|5.0 |4.08248290463863 |
#|9 |7 |6 |9 |31.0|7.75|1.5 |
#|1 |7 |4 |6 |18.0|4.5 |2.6457513110645907|
#|0 |8 |4 |8 |20.0|5.0 |3.8297084310253524|
#|0 |1 |6 |0 |7.0 |1.75|2.8722813232690143|
#|7 |1 |4 |3 |15.0|3.75|2.5 |
#|6 |3 |5 |9 |23.0|5.75|2.5 |
#|3 |3 |2 |8 |16.0|4.0 |2.70801280154532 |
#|6 |3 |0 |8 |17.0|4.25|3.5 |
#|3 |2 |7 |1 |13.0|3.25|2.6299556396765835|
#+---+---+---+---+----+----+------------------+
Spark < 2.4:
You can use functools.reduce
and operator.add
to sum the columns. The logic remains the same as above:
from functools import reduce
from operator import add
df.withColumn("any", reduce(add, [col(c) for c in cols])) \
.withColumn("mean", col("any") / len(cols)) \
.withColumn("std", sqrt(reduce(add, [(col(c) - col("mean")) ** 2 for c in cols]) / (len(cols) -1)))\
.show(truncate=False)
Upvotes: 3