Reputation: 11927
I have pyspark code like this,
spark_df = spark_df.orderBy('id', 'a1', 'c1')
out_df = spark_df.groupBy('id', 'a1', 'a2').agg(
F.first('c1').alias('c1'),
F.last('c2').alias('c2'),
F.first('c3').alias('c3'))
I need to keep the data ordered in the order id, a1 and c1. Then select columns as shown above over the group defined over the keys id, a1 and c1.
Due to first and last non-determinism I changed the code to this ugly looking code which works but I'm not sure that is efficient.
w_first = Window.partitionBy('id', 'a1', 'a2').orderBy('c1')
w_last = Window.partitionBy('id', 'a1', 'a2').orderBy(F.desc('c1'))
out_first = spark_df.withColumn('Rank_First', F.rank().over(w_first)).filter(F.col('Rank_First') == 1).drop(
'Rank_First')
out_last = spark_df.withColumn('Rank_Last', F.rank().over(w_last)).filter(F.col('Rank_First') == 1).drop(
'Rank_Last')
out_first = out_first.withColumnRenamed('c1', 'First_c1') \
.withColumnRenamed('c2', 'First_c2') \
.withColumnRenamed('c3', 'First_c3')
out_last = out_last.withColumnRenamed('c1', 'Last_c1') \
.withColumnRenamed('c2', 'Last_c2') \
.withColumnRenamed('c3', 'Last_c3')
out_df = out_first.join(out_last, ['id', 'a1', 'a2']) \
.select('id', 'a1', 'a2', F.col('First_c1').alias('c1'),
F.col('Last_c2').alias('c2'),
F.col('First_c3').alias('c3'))
I was trying for a better and efficient alternative. I run in to bottle necks in performance when data size is huge.
Is there a better alternative to do first and last over a window ordered in a specific order in one go.
Upvotes: 3
Views: 2327
Reputation: 32700
When using orderBy
with Window you need to specify frame boundaries as ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
otherwise the last
function will only get last value between UNBOUNDED PRECEDING
and CURRENT ROW
(the default frame bounds when order by is specified).
Try this:
w = Window.partitionBy('id', 'a1', 'a2').orderBy('c1') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = df.withColumn("First_c1", first("c1").over(w)) \
.withColumn("First_c3", first("c3").over(w)) \
.withColumn("Last_c2", last("c2").over(w))
df.groupby("id", "a1", "a2")\
.agg(first("First_c1").alias("c1"),
first("Last_c2").alias("c2"),
first("First_c3").alias("c3")
).show()
Upvotes: 4