Reputation: 3043
I have the following PySpark dataframe.
| name| someString| timestamp|
|name1| A-aa1-aa2|2012-01-10 00:00:00|
|name1| B-bb|2012-01-11 00:00:00|
|name1| C-cc1-cc2|2012-01-13 00:00:00|
|name1| D-dd1-dd2-dd3|2012-01-14 00:00:00|
|name1| E-ee|2012-01-15 00:00:00|
|name2| F-ff|2012-01-10 00:00:00|
|name2| G-gg1-gg2-gg3|2012-01-11 00:00:00|
|name2| H-hh1-hh2-hh3|2012-01-12 00:00:00|
|name2|I-ii1-ii2-ii3-ii4-ii5|2012-01-14 00:00:00|
In this dataframe, I want to create a new dataframe (say df2
) which has a column (named "concatStrings"
) which concatenates all elements from rows in the column someString
across a rolling time window of 3 days for every unique name type (alongside all columns of df1
In the above example, I want df2
to look like the following:
| name| someString| timestamp| concatStrings|
|name1| A-aa1-aa2|2012-01-10 00:00:00| A-aa1-aa2 |
|name1| B-bb|2012-01-11 00:00:00| A-aa1-aa2-B-bb |
|name1| C-cc1-cc2|2012-01-13 00:00:00| B-bb-C-cc1-cc2 |
|name1| D-dd1-dd2-dd3|2012-01-14 00:00:00| C-cc1-cc2-D-dd1-dd2-dd3 |
|name1| E-ee|2012-01-15 00:00:00| C-cc1-cc2-D-dd1-dd2-dd3-E-ee |
|name2| F-ff|2012-01-10 00:00:00| F-ff |
|name2| G-gg1-gg2-gg3|2012-01-11 00:00:00| F-ff-G-gg1-gg2-gg3 |
|name2| H-hh1-hh2-hh3|2012-01-12 00:00:00| F-ff-G-gg1-gg2-gg3-H-hh1-hh2-hh3 |
|name2|I-ii1-ii2-ii3-ii4-ii5|2012-01-14 00:00:00| H-hh1-hh2-hh3-I-ii1-ii2-ii3-ii4-ii5|
How can I do this?
Below is the code that I have tried till now:
win_ts = (
.rangeBetween(-(3 - 1) * 86400, 0)
df2 = df1.withColumn( "concatStrings" , F.concat_ws("-" , F.col("someString").over(win_ts) ) )
But, I get the below mentioned error when I tried the above code snippet:
Py4JJavaError: An error occurred while calling o235.withColumn.
: org.apache.spark.sql.AnalysisException: Expression 'someString#72' not supported within a window function.;;
Project [name#70, someString#72, timestamp#76, concatStrings#124]
+- Project [name#70, someString#72, timestamp#76, _w0#125L, _we0#126, concat_ws(-, _we0#126) AS concatStrings#124]
+- Window [someString#72 windowspecdefinition(name#70, _w0#125L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -172800, currentrow$())) AS _we0#126], [name#70], [_w0#125L ASC NULLS FIRST]
+- Project [name#70, someString#72, timestamp#76, cast(timestamp#76 as bigint) AS _w0#125L]
+- Project [name#70, someString#72, timestamp#76]
You can use the below mentioned code to generate the dataframe df1
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import *
from pyspark.sql.types import TimestampType
df1_Stats = Row("name", "timestamp1", "someString")
df1_stat1 = df1_Stats("name1", "2012-01-10 00:00:00", "A-aa1-aa2")
df1_stat2 = df1_Stats("name1", "2012-01-11 00:00:00", "B-bb")
df1_stat3 = df1_Stats("name1", "2012-01-13 00:00:00", "C-cc1-cc2")
df1_stat4 = df1_Stats("name1", "2012-01-14 00:00:00", "D-dd1-dd2-dd3")
df1_stat5 = df1_Stats("name1", "2012-01-15 00:00:00", "E-ee")
df1_stat6 = df1_Stats("name2", "2012-01-10 00:00:00", "F-ff")
df1_stat7 = df1_Stats("name2", "2012-01-11 00:00:00", "G-gg1-gg2-gg3")
df1_stat8 = df1_Stats("name2", "2012-01-12 00:00:00", "H-hh1-hh2-hh3")
df1_stat9 = df1_Stats("name2", "2012-01-14 00:00:00", "I-ii1-ii2-ii3-ii4-ii5")
df1_stat_lst = [
df1 = spark.createDataFrame(df1_stat_lst)
df1 = df1.withColumn( "timestamp" , df1["timestamp1"].cast(TimestampType()) ).drop("timestamp1")
Upvotes: 2
Views: 3386
Reputation: 539
You first need to collect as list and then use array_join
to convert it into a string. This should work:
df2 = df1.withColumn("concatStrings" , F.collect_list("someString").over(win_ts)) \
.withColumn("concatStrings", F.array_join("concatStrings", '-'))
Upvotes: 4