Reputation: 1081
I have a dataframe like this:
usr sec scrpt
0 1 5 This
1 2 10 is
2 3 12 a
3 1 7 string
4 2 4 oreo
i am trying to order/sort by user,sec and then groupby on user and concat the string there. so this table consist of every user call there in which second what did he speak . so resulting dataframe should look like
user concated
1 this string
2 oreo is
3 a
I have tried below in python and works fine
df.sort_values(by=['usr','sec'],ascending=[True, True]).groupby(['usr')['scrpt'].apply(lambda x: ','.join(x)).reset_index()
Could anyone give me similar in pyspark?
Upvotes: 1
Views: 3229
Reputation: 31490
From Spark-2.4+
use array_join
, sort_array
, transform
functions for this case.
#sample dataframe
df=spark.createDataFrame([(1,5,"This"),(2,10,"is"),(3,12,"a"),(1,7,"string"),(2,4,"oreo")],["usr","sec","scrpt"])
df.show()
#+---+---+------+
#|usr|sec| scrpt|
#+---+---+------+
#| 1| 5| This|
#| 2| 10| is|
#| 3| 12| a|
#| 1| 7|string|
#| 2| 4| oreo|
#+---+---+------+
df.groupBy("usr").agg(array_join(expr("""transform(sort_array(collect_list(struct(sec,scrpt)),True), x -> x.scrpt)""")," ").alias("concated")).orderBy("usr").show(10,False)
df.groupBy("usr").agg(concat_ws(" ",expr("""transform(sort_array(collect_list(struct(sec,scrpt)),True), x -> x.scrpt)""")).alias("concated")).orderBy("usr").show(10,False)
#+---+-----------+
#|usr|concated |
#+---+-----------+
#|1 |This string|
#|2 |oreo is |
#|3 |a |
#+---+-----------+
#lower case
df.groupBy("usr").agg(lower(array_join(expr("""transform(sort_array(collect_list(struct(sec,scrpt)),True), x -> x.scrpt)""")," ")).alias("concated")).orderBy("usr").show(10,False)
#+---+-----------+
#|usr|concated |
#+---+-----------+
#|1 |this string|
#|2 |oreo is |
#|3 |a |
#+---+-----------+
Upvotes: 3
Reputation: 294
You can use Window
functionality to accomplish what you want in PySpark.
import pyspark.sql.functions as sf
# Construct a window to construct sentences
sentence_window = Window.partitionBy('usr').orderBy(sf.col('sec').asc())
# Construct a window to get the last sentence. The others will be sentence fragments spoken by the user.
rank_window = Window.partitionBy('usr').orderBy(sf.col('sec').desc())
user_sentences = spark_data_df.select('usr',
sf.collect_list(sf.col('scrpt')).over(sentence_window).alias('sentence'),
sf.rank().over(rank_window).alias('rank'))
user_sentences = user_sentences.filter("rank = 1").drop('rank')
user_sentences = user_sentences.withColumn('sentence', sf.concat_ws(' ', sf.col('sentence')))
user_sentences.show(10, False)
Upvotes: 1