wBob
wBob

Reputation: 14379

Spark Dataframe filldown

I would like to do a "filldown" type operation on a dataframe in order to remove nulls and make sure the last row is a kind of summary row, containing the last known values for each column based on the timestamp, grouped by the itemId. As I'm using Azure Synapse Notebooks the language can be Scala, Pyspark, SparkSQL or even c#. However the problem here is that the real solution has up to millions of rows and hundreds of columns, so I need a dynamic solution that can take advantage of Spark. We can provision a big cluster to how to make sure we take good advantage of it?

Sample data:

// Assign sample data to dataframe
val df = Seq(
    ( 1, "10/01/2021", 1, "abc", null ),
    ( 2, "11/01/2021", 1, null, "bbb" ),
    ( 3, "12/01/2021", 1, "ccc", null ),
    ( 4, "13/01/2021", 1, null, "ddd" ),

    ( 5, "10/01/2021", 2, "eee", "fff" ),
    ( 6, "11/01/2021", 2, null, null ),
    ( 7, "12/01/2021", 2, null, null )
    ).
    toDF("eventId", "timestamp", "itemId", "attrib1", "attrib2")

df.show

Expected results with rows 4 and 7 as summary rows:

+-------+----------+------+-------+-------+
|eventId| timestamp|itemId|attrib1|attrib2|
+-------+----------+------+-------+-------+
|      1|10/01/2021|     1|    abc|   null|
|      2|11/01/2021|     1|    abc|    bbb|
|      3|12/01/2021|     1|    ccc|    bbb|
|      4|13/01/2021|     1|    ccc|    ddd|
|      5|10/01/2021|     2|    eee|    fff|
|      6|11/01/2021|     2|    eee|    fff|
|      7|12/01/2021|     2|    eee|    fff|
+-------+----------+------+-------+-------+

I have reviewed this option but had trouble adapting it for my use case.

Spark / Scala: forward fill with last observation

I have a kind of working SparkSQL solution but it will be very verbose for the high volume of columns, hoping for something easier to maintain:

%%sql
WITH cte (
SELECT
    eventId,
    itemId,
    ROW_NUMBER() OVER( PARTITION BY itemId ORDER BY timestamp ) AS rn,
    attrib1,
    attrib2
FROM df
)
SELECT
    eventId,
    itemId,
    CASE rn WHEN 1 THEN attrib1 
        ELSE COALESCE( attrib1, LAST_VALUE(attrib1, true) OVER( PARTITION BY itemId ) ) 
    END AS attrib1_xlast,
    CASE rn WHEN 1 THEN attrib2 
        ELSE COALESCE( attrib2, LAST_VALUE(attrib2, true) OVER( PARTITION BY itemId ) ) 
    END AS attrib2_xlast
    
FROM cte
ORDER BY eventId

Upvotes: 0

Views: 329

Answers (1)

koiralo
koiralo

Reputation: 23109

For many columns you could create an expression as below

val window = Window.partitionBy($"itemId").orderBy($"timestamp")

// Instead of selecting columns you could create a list of columns 
val expr = df.columns
  .map(c => coalesce(col(c), last(col(c), true).over(window)).as(c))

df.select(expr: _*).show(false)

Update:

val mainColumns = df.columns.filterNot(_.startsWith("attrib"))
val aggColumns = df.columns.diff(mainColumns).map(c => coalesce(col(c), last(col(c), true).over(window)).as(c))

df.select(( mainColumns.map(col) ++ aggColumns): _*).show(false)

Result:

+-------+----------+------+-------+-------+
|eventId|timestamp |itemId|attrib1|attrib2|
+-------+----------+------+-------+-------+
|1      |10/01/2021|1     |abc    |null   |
|2      |11/01/2021|1     |abc    |bbb    |
|3      |12/01/2021|1     |ccc    |bbb    |
|4      |13/01/2021|1     |ccc    |ddd    |
|5      |10/01/2021|2     |eee    |fff    |
|6      |11/01/2021|2     |eee    |fff    |
|7      |12/01/2021|2     |eee    |fff    |
+-------+----------+------+-------+-------+

Upvotes: 1

Related Questions