Puneet Sinha
Puneet Sinha

Reputation: 1081

Pyspark : order/sort by then group by and concat string

I have a dataframe like this:

   usr     sec    scrpt
0  1        5     This
1  2        10      is
2  3        12       a
3  1        7    string
4  2        4      oreo

i am trying to order/sort by user,sec and then groupby on user and concat the string there. so this table consist of every user call there in which second what did he speak . so resulting dataframe should look like

user   concated
1      this string
2      oreo is
3      a

I have tried below in python and works fine

df.sort_values(by=['usr','sec'],ascending=[True, True]).groupby(['usr')['scrpt'].apply(lambda x: ','.join(x)).reset_index()

Could anyone give me similar in pyspark?

Upvotes: 1

Views: 3229

Answers (2)

notNull
notNull

Reputation: 31490

From Spark-2.4+ use array_join, sort_array, transform functions for this case.

#sample dataframe

df=spark.createDataFrame([(1,5,"This"),(2,10,"is"),(3,12,"a"),(1,7,"string"),(2,4,"oreo")],["usr","sec","scrpt"])

df.show()
#+---+---+------+
#|usr|sec| scrpt|
#+---+---+------+
#|  1|  5|  This|
#|  2| 10|    is|
#|  3| 12|     a|
#|  1|  7|string|
#|  2|  4|  oreo|
#+---+---+------+

df.groupBy("usr").agg(array_join(expr("""transform(sort_array(collect_list(struct(sec,scrpt)),True), x -> x.scrpt)""")," ").alias("concated")).orderBy("usr").show(10,False)

df.groupBy("usr").agg(concat_ws(" ",expr("""transform(sort_array(collect_list(struct(sec,scrpt)),True), x -> x.scrpt)""")).alias("concated")).orderBy("usr").show(10,False)
#+---+-----------+
#|usr|concated   |
#+---+-----------+
#|1  |This string|
#|2  |oreo is    |
#|3  |a          |
#+---+-----------+

#lower case
df.groupBy("usr").agg(lower(array_join(expr("""transform(sort_array(collect_list(struct(sec,scrpt)),True), x -> x.scrpt)""")," ")).alias("concated")).orderBy("usr").show(10,False)
#+---+-----------+
#|usr|concated   |
#+---+-----------+
#|1  |this string|
#|2  |oreo is    |
#|3  |a          |
#+---+-----------+

Upvotes: 3

Srikant
Srikant

Reputation: 294

You can use Window functionality to accomplish what you want in PySpark.

    import pyspark.sql.functions as sf

    # Construct a window to construct sentences
    sentence_window = Window.partitionBy('usr').orderBy(sf.col('sec').asc())

    # Construct a window to get the last sentence. The others will be sentence fragments spoken by the user.
    rank_window = Window.partitionBy('usr').orderBy(sf.col('sec').desc())

    user_sentences = spark_data_df.select('usr',
                                      sf.collect_list(sf.col('scrpt')).over(sentence_window).alias('sentence'),
                                      sf.rank().over(rank_window).alias('rank'))

     user_sentences = user_sentences.filter("rank = 1").drop('rank')
     user_sentences = user_sentences.withColumn('sentence', sf.concat_ws(' ', sf.col('sentence')))

     user_sentences.show(10, False)

Upvotes: 1

Related Questions