Reputation: 4482
I have the following data.frame
in spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import functions as sf
from pyspark.sql.functions import col, when, lit
ddf = spark.createDataFrame([[None, 'Michael',2],
[30, 'Andy',3],
[19, 'Justin',4],
[30, 'James Dr No From Russia with Love Bond',6]],
schema=['age', 'name','weights'])
ddf.show()
In this trivial example I would like to create two columns: One with the weighted.mean
of the age
if age>29
(with name weighted_age
) and the other the age^2
if age<=29
(with the name age_squared
)
Upvotes: 2
Views: 3726
Reputation: 41957
You should first find the weighted.mean
from the whole dataset with age > 29
and then populate using withColumn
. This is because weighted.mean
depends on whole dataset.
age_squared
can be done row by row as
from pyspark.sql import functions as f
weightedMean = ddf.filter(f.col('age')>29).select(f.sum(f.col('age')*f.col('weights'))/f.sum(f.col('weights'))).first()[0]
ddf.withColumn('weighted_age', f.when(f.col('age') > 29, weightedMean))\
.withColumn('age_squared', f.when(f.col('age') <= 29, f.col('age')*f.col('age')))\
.show(truncate=False)
which should give you
+----+--------------------------------------+-------+------------+-----------+
|age |name |weights|weighted_age|age_squared|
+----+--------------------------------------+-------+------------+-----------+
|null|Michael |2 |null |null |
|30 |Andy |3 |30.0 |null |
|19 |Justin |4 |null |361 |
|30 |James Dr No From Russia with Love Bond|6 |30.0 |null |
+----+--------------------------------------+-------+------------+-----------+
You can populate other value using .otherwise
with when
function instead of populating default null
Upvotes: 4