Reputation: 367
I am practising with PySpark and I need to obtain something like this. I have a dataset that I have grouped by title, let's say, and I need for each title to replace a (new) third column with the minimum of another column
title | var | new_col |
---|---|---|
item1 | 3 | 0 |
item1 | 4 | 0 |
item1 | 5 | 0 |
item2 | 2 | 0 |
item2 | 10 | 0 |
To this table
title | var | min |
---|---|---|
item1 | 3 | 3 |
item1 | 4 | 3 |
item1 | 5 | 3 |
item2 | 2 | 2 |
item2 | 10 | 2 |
For now I succeeded in grouping the first table, but clearly I only end up with two rows, the first being item1 with the min of '3' and item2 with '2'. Any suggestion on how I can proceed now?
Upvotes: 1
Views: 1557
Reputation: 42422
You can use window functions for this purpose.
from pyspark.sql import functions as F, Window
df2 = df.select(
'title',
'var',
F.min('var').over(Window.partitionBy('title')).alias('min')
)
Or, for a simpler syntax:
df2 = df.selectExpr('title', 'var', 'min(var) over(partition by title) min')
Upvotes: 2
Reputation: 11090
PartitionBy title orderBy var and get first row for reach partition.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
w = Window.partitionBy("title").orderBy(col("var"))
df.withColumn("min",row_number.over(w)).where($"min" === 1)
Or
from pyspark.sql import functions as f
df.groupBy("title").agg(f.min("var").alias("min"))
Upvotes: 0
Reputation: 41
Define a window function which partition data over "title" column. Then within each partition , get minimum of all rows in that partition. This will give you minimum value along with other columns.
from pyspark.sql.window import Window
from pyspark.sql import functions as F
windowSpec = Window.partitionBy(df['title']) .orderBy(df['title'].desc()).rangeBetween(-sys.maxsize, sys.maxsize)
min_var = (F.min(df['var']).over(windowSpec))
min_df =df.select("title", "var", min_var.alias("min"))
min_df.printSchema()
min_df.show()
Upvotes: 0