Siddharth Satpathy
Siddharth Satpathy

Reputation: 3043

Concatenate PySpark rows using windows

I have the following PySpark dataframe.

+-----+---------------------+-------------------+
| name|           someString|          timestamp|
+-----+---------------------+-------------------+
|name1|            A-aa1-aa2|2012-01-10 00:00:00|
|name1|                 B-bb|2012-01-11 00:00:00|
|name1|            C-cc1-cc2|2012-01-13 00:00:00|
|name1|        D-dd1-dd2-dd3|2012-01-14 00:00:00|
|name1|                 E-ee|2012-01-15 00:00:00|
|name2|                 F-ff|2012-01-10 00:00:00|
|name2|        G-gg1-gg2-gg3|2012-01-11 00:00:00|
|name2|        H-hh1-hh2-hh3|2012-01-12 00:00:00|
|name2|I-ii1-ii2-ii3-ii4-ii5|2012-01-14 00:00:00|
+-----+---------------------+-------------------+

In this dataframe, I want to create a new dataframe (say df2) which has a column (named "concatStrings") which concatenates all elements from rows in the column someString across a rolling time window of 3 days for every unique name type (alongside all columns of df1).

In the above example, I want df2 to look like the following:

+-----+---------------------+-------------------+--------------------------------------+
| name|           someString|          timestamp|                         concatStrings|
+-----+---------------------+-------------------+--------------------------------------+
|name1|            A-aa1-aa2|2012-01-10 00:00:00|   A-aa1-aa2                          | 
|name1|                 B-bb|2012-01-11 00:00:00|   A-aa1-aa2-B-bb                     | 
|name1|            C-cc1-cc2|2012-01-13 00:00:00|   B-bb-C-cc1-cc2                     |
|name1|        D-dd1-dd2-dd3|2012-01-14 00:00:00|   C-cc1-cc2-D-dd1-dd2-dd3            | 
|name1|                 E-ee|2012-01-15 00:00:00|   C-cc1-cc2-D-dd1-dd2-dd3-E-ee       |
|name2|                 F-ff|2012-01-10 00:00:00|   F-ff                               |
|name2|        G-gg1-gg2-gg3|2012-01-11 00:00:00|   F-ff-G-gg1-gg2-gg3                 |
|name2|        H-hh1-hh2-hh3|2012-01-12 00:00:00|   F-ff-G-gg1-gg2-gg3-H-hh1-hh2-hh3   |
|name2|I-ii1-ii2-ii3-ii4-ii5|2012-01-14 00:00:00|   H-hh1-hh2-hh3-I-ii1-ii2-ii3-ii4-ii5|
+-----+---------------------+-------------------+--------------------------------------+

How can I do this?

Below is the code that I have tried till now:

win_ts = (
    Window.partitionBy("name")
    .orderBy(F.col("timestamp").cast("long"))
    .rangeBetween(-(3 - 1) * 86400, 0)
)

df2 = df1.withColumn( "concatStrings" , F.concat_ws("-" , F.col("someString").over(win_ts) ) )

But, I get the below mentioned error when I tried the above code snippet:

Py4JJavaError: An error occurred while calling o235.withColumn.
: org.apache.spark.sql.AnalysisException: Expression 'someString#72' not supported within a window function.;;
Project [name#70, someString#72, timestamp#76, concatStrings#124]
+- Project [name#70, someString#72, timestamp#76, _w0#125L, _we0#126, concat_ws(-, _we0#126) AS concatStrings#124]
   +- Window [someString#72 windowspecdefinition(name#70, _w0#125L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -172800, currentrow$())) AS _we0#126], [name#70], [_w0#125L ASC NULLS FIRST]
      +- Project [name#70, someString#72, timestamp#76, cast(timestamp#76 as bigint) AS _w0#125L]
         +- Project [name#70, someString#72, timestamp#76]

You can use the below mentioned code to generate the dataframe df1

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import *
from pyspark.sql.types import TimestampType

df1_Stats = Row("name", "timestamp1", "someString")

df1_stat1 = df1_Stats("name1", "2012-01-10 00:00:00", "A-aa1-aa2")
df1_stat2 = df1_Stats("name1", "2012-01-11 00:00:00", "B-bb")
df1_stat3 = df1_Stats("name1", "2012-01-13 00:00:00", "C-cc1-cc2")
df1_stat4 = df1_Stats("name1", "2012-01-14 00:00:00", "D-dd1-dd2-dd3")
df1_stat5 = df1_Stats("name1", "2012-01-15 00:00:00", "E-ee")
df1_stat6 = df1_Stats("name2", "2012-01-10 00:00:00", "F-ff")
df1_stat7 = df1_Stats("name2", "2012-01-11 00:00:00", "G-gg1-gg2-gg3")
df1_stat8 = df1_Stats("name2", "2012-01-12 00:00:00", "H-hh1-hh2-hh3")
df1_stat9 = df1_Stats("name2", "2012-01-14 00:00:00", "I-ii1-ii2-ii3-ii4-ii5")

df1_stat_lst = [
    df1_stat1,
    df1_stat2,
    df1_stat3,
    df1_stat4,
    df1_stat5,
    df1_stat6,
    df1_stat7,
    df1_stat8,
    df1_stat9,
]

df1 = spark.createDataFrame(df1_stat_lst)
df1 = df1.withColumn( "timestamp" , df1["timestamp1"].cast(TimestampType()) ).drop("timestamp1")

Upvotes: 2

Views: 3386

Answers (1)

Seb
Seb

Reputation: 539

You first need to collect as list and then use array_join to convert it into a string. This should work:

df2 = df1.withColumn("concatStrings" , F.collect_list("someString").over(win_ts)) \
         .withColumn("concatStrings", F.array_join("concatStrings", '-'))

Upvotes: 4

Related Questions