Reputation: 61
I am reading a legacy file into a Dataframe and it looks something like below;
+-----------+----------+----------+--------+--------+--------+
|c1 | c2 | c3 | c4 | c5 | c6 |
+-----------+----------+----------+--------+--------+--------+
| 01 | B01 |null |null |file1 |B01-01 |
| 06 | B01 |foo |bar |file1 |B01-02 |
| 06 | B01 |foo |bar |file1 |B01-03 |
| 09 | B01 |2021-12-07|null |file1 |B01-04 |
| 01 | B02 |null |null |file2 |B02-01 |
| 09 | B02 |2021-12-07|null |file2 |B02-02 |
| 01 | B03 |null |null |file3 |B03-01 |
| 06 | B03 |foo |bar |file3 |B03-02 |
| 06 | B03 |foo |bar |file3 |B03-03 |
| 09 | B03 |2021-12-07|null |file3 |B03-04 |
| 01 | B01 |null |null |file4 |B01-01 |
| 06 | B01 |foo |bar |file4 |B01-02 |
| 06 | B01 |foo |bar |file4 |B01-03 |
| 09 | B01 |2021-12-06|null |file4 |B01-04 |
+-----------+----------+----------+--------+--------+--------+
One physical file contains multiple logical files in it and has a header (01), detail_rec (06) and trailer (09) (sometimes only header and trailer)
I want to take the date from the trailer for every logical separation and add that as a column to that block of records as shown below.
+-----------+----------+----------+--------+--------+--------+----------+
|c1 | c2 | c3 | c4 | c5 | c6 | c7 |
+-----------+----------+----------+--------+--------+--------+----------+
| 01 | B01 |null |null |file1 |B01-01 |2021-12-07|
| 06 | B01 |foo |bar |file1 |B01-02 |2021-12-07|
| 06 | B01 |foo |bar |file1 |B01-03 |2021-12-07|
| 09 | B01 |2021-12-07|null |file1 |B01-04 |2021-12-07|
| 01 | B02 |null |null |file2 |B02-01 |2021-12-05|
| 09 | B02 |2021-12-05|null |file2 |B02-02 |2021-12-05|
| 01 | B03 |null |null |file3 |B03-01 |2021-12-07|
| 06 | B03 |foo |bar |file3 |B03-02 |2021-12-07|
| 06 | B03 |foo |bar |file3 |B03-03 |2021-12-07|
| 09 | B03 |2021-12-07|null |file3 |B03-04 |2021-12-07|
| 01 | B01 |null |null |file4 |B01-01 |2021-12-06|
| 06 | B01 |foo |bar |file4 |B01-02 |2021-12-06|
| 06 | B01 |foo |bar |file4 |B01-03 |2021-12-06|
| 09 | B01 |2021-12-06|null |file4 |B01-04 |2021-12-06|
+-----------+----------+----------+--------+--------+--------+----------+
I tried the Window functionality to extract the rowsBetween using unboundedPreceding and unboundedFollowing, but couldn't reach anywhere.
Upvotes: 0
Views: 567
Reputation: 61
I was able to solve this using Window
from pyspark.sql import functions as sf
from pyspark.sql.window import Window
w = Window.partitionBy('c5').orderBy('c1').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
new_df = df.withColumn('c7', sf.last('c3').over(w))
Created group based of c5 and then picked the last value of c3. Added that as a new column c7
Upvotes: 0
Reputation: 3242
You can filter the trailer
records from the original df and then rename c3
column as c7
. Finally join the original dataframe and the filtered dataframe on the filename column c5
.
from pyspark.sql import functions as F
data = [("01", "B01", None, None,"file1", "B01-01"),
("06", "B01", "foo", "bar" ,"file1", "B01-02"),
("06", "B01", "foo", "bar" ,"file1", "B01-03"),
("09", "B01", "2021-12-07", None,"file1", "B01-04"),
("01", "B02", None, None,"file2", "B02-01"),
("09", "B02", "2021-12-05", None,"file2", "B02-02"),
("01", "B03", None, None,"file3", "B03-01"),
("06", "B03", "foo", "bar" ,"file3", "B03-02"),
("06", "B03", "foo", "bar" ,"file3", "B03-03"),
("09", "B03", "2021-12-07", None,"file3", "B03-04"),
("01", "B01", None, None,"file4", "B01-01"),
("06", "B01", "foo", "bar" ,"file4", "B01-02"),
("06", "B01", "foo", "bar" ,"file4", "B01-03"),
("09", "B01", "2021-12-06", None,"file4", "B01-04"),]
df = spark.createDataFrame(data, ("c1", "c2", "c3", "c4", "c5", "c6", ))
df_trailer = df.selectExpr("c5", "c3 as c7").filter(F.col("c1") == "09")
df.join(df_trailer, ["c5"]).show()
+-----+---+---+----------+----+------+----------+
| c5| c1| c2| c3| c4| c6| c7|
+-----+---+---+----------+----+------+----------+
|file1| 01|B01| null|null|B01-01|2021-12-07|
|file1| 06|B01| foo| bar|B01-02|2021-12-07|
|file1| 06|B01| foo| bar|B01-03|2021-12-07|
|file1| 09|B01|2021-12-07|null|B01-04|2021-12-07|
|file2| 01|B02| null|null|B02-01|2021-12-05|
|file2| 09|B02|2021-12-05|null|B02-02|2021-12-05|
|file3| 01|B03| null|null|B03-01|2021-12-07|
|file3| 06|B03| foo| bar|B03-02|2021-12-07|
|file3| 06|B03| foo| bar|B03-03|2021-12-07|
|file3| 09|B03|2021-12-07|null|B03-04|2021-12-07|
|file4| 01|B01| null|null|B01-01|2021-12-06|
|file4| 06|B01| foo| bar|B01-02|2021-12-06|
|file4| 06|B01| foo| bar|B01-03|2021-12-06|
|file4| 09|B01|2021-12-06|null|B01-04|2021-12-06|
+-----+---+---+----------+----+------+----------+
Upvotes: 2