Prashy
Prashy

Reputation: 61

Pyspark : Split dataframe based on contents and extract date from bottom line of split

I am reading a legacy file into a Dataframe and it looks something like below;

+-----------+----------+----------+--------+--------+--------+
|c1         |      c2  | c3       |    c4  |    c5  |    c6  |
+-----------+----------+----------+--------+--------+--------+
| 01        |  B01     |null      |null    |file1   |B01-01  |
| 06        |  B01     |foo       |bar     |file1   |B01-02  |
| 06        |  B01     |foo       |bar     |file1   |B01-03  |
| 09        |  B01     |2021-12-07|null    |file1   |B01-04  |
| 01        |  B02     |null      |null    |file2   |B02-01  |
| 09        |  B02     |2021-12-07|null    |file2   |B02-02  |
| 01        |  B03     |null      |null    |file3   |B03-01  |
| 06        |  B03     |foo       |bar     |file3   |B03-02  |
| 06        |  B03     |foo       |bar     |file3   |B03-03  |
| 09        |  B03     |2021-12-07|null    |file3   |B03-04  |
| 01        |  B01     |null      |null    |file4   |B01-01  |
| 06        |  B01     |foo       |bar     |file4   |B01-02  |
| 06        |  B01     |foo       |bar     |file4   |B01-03  |
| 09        |  B01     |2021-12-06|null    |file4   |B01-04  |
+-----------+----------+----------+--------+--------+--------+

One physical file contains multiple logical files in it and has a header (01), detail_rec (06) and trailer (09) (sometimes only header and trailer)

I want to take the date from the trailer for every logical separation and add that as a column to that block of records as shown below.

+-----------+----------+----------+--------+--------+--------+----------+
|c1         |      c2  | c3       |    c4  |    c5  |    c6  | c7       |
+-----------+----------+----------+--------+--------+--------+----------+
| 01        |  B01     |null      |null    |file1   |B01-01  |2021-12-07|
| 06        |  B01     |foo       |bar     |file1   |B01-02  |2021-12-07|
| 06        |  B01     |foo       |bar     |file1   |B01-03  |2021-12-07|
| 09        |  B01     |2021-12-07|null    |file1   |B01-04  |2021-12-07|
| 01        |  B02     |null      |null    |file2   |B02-01  |2021-12-05|
| 09        |  B02     |2021-12-05|null    |file2   |B02-02  |2021-12-05|
| 01        |  B03     |null      |null    |file3   |B03-01  |2021-12-07|
| 06        |  B03     |foo       |bar     |file3   |B03-02  |2021-12-07|
| 06        |  B03     |foo       |bar     |file3   |B03-03  |2021-12-07|
| 09        |  B03     |2021-12-07|null    |file3   |B03-04  |2021-12-07|
| 01        |  B01     |null      |null    |file4   |B01-01  |2021-12-06|
| 06        |  B01     |foo       |bar     |file4   |B01-02  |2021-12-06|
| 06        |  B01     |foo       |bar     |file4   |B01-03  |2021-12-06|
| 09        |  B01     |2021-12-06|null    |file4   |B01-04  |2021-12-06|
+-----------+----------+----------+--------+--------+--------+----------+

I tried the Window functionality to extract the rowsBetween using unboundedPreceding and unboundedFollowing, but couldn't reach anywhere.

Upvotes: 0

Views: 567

Answers (2)

Prashy
Prashy

Reputation: 61

I was able to solve this using Window

from pyspark.sql import functions as sf
from pyspark.sql.window import Window

w = Window.partitionBy('c5').orderBy('c1').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
new_df = df.withColumn('c7', sf.last('c3').over(w))

Created group based of c5 and then picked the last value of c3. Added that as a new column c7

Upvotes: 0

Nithish
Nithish

Reputation: 3242

You can filter the trailer records from the original df and then rename c3 column as c7. Finally join the original dataframe and the filtered dataframe on the filename column c5.


from pyspark.sql import functions as F

data = [("01", "B01", None, None,"file1", "B01-01"),
("06", "B01", "foo", "bar" ,"file1", "B01-02"),
("06", "B01", "foo", "bar" ,"file1", "B01-03"),
("09", "B01", "2021-12-07", None,"file1", "B01-04"),
("01", "B02", None, None,"file2", "B02-01"),
("09", "B02", "2021-12-05", None,"file2", "B02-02"),
("01", "B03", None, None,"file3", "B03-01"),
("06", "B03", "foo", "bar" ,"file3", "B03-02"),
("06", "B03", "foo", "bar" ,"file3", "B03-03"),
("09", "B03", "2021-12-07", None,"file3", "B03-04"),
("01", "B01", None, None,"file4", "B01-01"),
("06", "B01", "foo", "bar" ,"file4", "B01-02"),
("06", "B01", "foo", "bar" ,"file4", "B01-03"),
("09", "B01", "2021-12-06", None,"file4", "B01-04"),]

df = spark.createDataFrame(data, ("c1", "c2", "c3", "c4", "c5", "c6", )) 

df_trailer = df.selectExpr("c5", "c3 as c7").filter(F.col("c1") == "09")

df.join(df_trailer, ["c5"]).show()

Output

+-----+---+---+----------+----+------+----------+
|   c5| c1| c2|        c3|  c4|    c6|        c7|
+-----+---+---+----------+----+------+----------+
|file1| 01|B01|      null|null|B01-01|2021-12-07|
|file1| 06|B01|       foo| bar|B01-02|2021-12-07|
|file1| 06|B01|       foo| bar|B01-03|2021-12-07|
|file1| 09|B01|2021-12-07|null|B01-04|2021-12-07|
|file2| 01|B02|      null|null|B02-01|2021-12-05|
|file2| 09|B02|2021-12-05|null|B02-02|2021-12-05|
|file3| 01|B03|      null|null|B03-01|2021-12-07|
|file3| 06|B03|       foo| bar|B03-02|2021-12-07|
|file3| 06|B03|       foo| bar|B03-03|2021-12-07|
|file3| 09|B03|2021-12-07|null|B03-04|2021-12-07|
|file4| 01|B01|      null|null|B01-01|2021-12-06|
|file4| 06|B01|       foo| bar|B01-02|2021-12-06|
|file4| 06|B01|       foo| bar|B01-03|2021-12-06|
|file4| 09|B01|2021-12-06|null|B01-04|2021-12-06|
+-----+---+---+----------+----+------+----------+

Upvotes: 2

Related Questions