arunb2w
arunb2w

Reputation: 1196

Retain previous value of same column pyspark

I wanted to retrieve the latest values for the record key from the list of events.

Sample JSON Events

{"table":"TABLEA", "SORTORDER": 1, "ID": 244, "COLUMNA": 283.7, "changed_cols": ["ID", "COLUMNA"]}
{"table":"TABLEA", "SORTORDER": 2, "ID": 244, "COLUMNA": null, "changed_cols": ["ID", "COLUMNA"]}
{"table":"TABLEA", "SORTORDER": 3, "ID": 244, "COLUMNA": 200, "COLUMNC": "CLOSE", "changed_cols": ["ID", "COLUMNA", "COLUMNC"]}
{"table":"TABLEA", "SORTORDER": 4, "ID": 244, "COLUMNA": null, "COLUMNB": "user", "COLUMNC": "INTERIM", "COLUMND": 35000, "changed_cols": ["ID", "COLUMNA", "COLUMNB", "COLUMNC", "COLUMND"]}
{"table":"TABLEA", "SORTORDER": 5, "ID": 244, "COLUMNB": "user", "COLUMNC": "OPEN", "changed_cols": ["ID", "COLUMNB", "COLUMNC"]}
{"table":"TABLEA", "SORTORDER": 6, "ID": 244, "COLUMNB": "user", "COLUMNC": null, "changed_cols": ["ID", "COLUMNB", "COLUMNC"]}
{"table":"TABLEA", "SORTORDER": 1, "ID": 245, "COLUMNA": 283.7, "changed_cols": ["ID", "COLUMNA"]}
{"table":"TABLEA", "SORTORDER": 2, "ID": 245, "COLUMNB": null, "changed_cols": ["ID", "COLUMNB"]}
{"table":"TABLEA", "SORTORDER": 3, "ID": 245, "COLUMNA": 200, "COLUMNC": "CLOSE", "changed_cols": ["ID", "COLUMNA", "COLUMNC"]}

Expected Output:

If we take the latest changed value from the above events for the primary key(ID) it will look like

+---+-------+-------+-------+-------+----------------------------------------+
|ID |COLUMNA|COLUMNB|COLUMNC|COLUMND|COL_LIST                                |
+---+-------+-------+-------+-------+----------------------------------------+
|244|null   |user   |null   |35000  |[ID, COLUMNA, COLUMNC, COLUMNB, COLUMND]|
|245|200.0  |null   |CLOSE  |null   |[ID, COLUMNA, COLUMNB, COLUMNC]         |
+---+-------+-------+-------+-------+----------------------------------------+

What I tried

First, I rolled up list of changed cols and created a super list out of it for each row. Next, I tried to retain the previous value for that column based on a condition. Though it is working for few columns am not able to retain previous values as I expected to be.

json_df = spark.read.json(input_file_path)

windowSpec = Window().partitionBy(['ID']) \
    .orderBy([F.asc('SORTORDER')])
# ranking needs to be in reverse order to use rank == 1
rankwindowSpec = Window().partitionBy(['ID']) \
    .orderBy([F.desc('SORTORDER')])

json_df = json_df.withColumn("rank", F.rank().over(rankwindowSpec)) \
    .withColumn("changed_cols_list",
                F.array_distinct(F.flatten(F.collect_list(F.col("changed_cols")).over(windowSpec))))

columns = ['COLUMNA', 'COLUMNB', 'COLUMNC', 'COLUMND']
json_df.show(truncate=False)
print("#######################")

for col in columns:
    print(col)
    json_df = json_df.withColumn(col, F.when(F.array_contains(F.col('changed_cols_list'), col), F.last(F.col(col)).over(windowSpec))
                            .otherwise(F.lit('__DEFAULT__'))
                                 )

json_df.filter(json_df.rank == 1).select('ID', 'COLUMNA', 'COLUMNB', 'COLUMNC', 'COLUMND').show(truncate=False)

My Output:

It is not properly working for the column COLUMND

+---+-------+-------+-------+-------+
|ID |COLUMNA|COLUMNB|COLUMNC|COLUMND|
+---+-------+-------+-------+-------+
|244|null   |user   |null   |null   |
+---+-------+-------+-------+-------+

Upvotes: 0

Views: 179

Answers (1)

Matt Andruff
Matt Andruff

Reputation: 5125

from pyspark.sql import functions as F

data = [("{\"table\":\"TABLEA\", \"SORTORDER\": 1, \"ID\": 244, \"COLUMNA\": 283.7, \"changed_cols\": [\"ID\", \"COLUMNA\"]}"),
("{\"table\":\"TABLEA\", \"SORTORDER\": 2, \"ID\": 244, \"COLUMNA\": null, \"changed_cols\": [\"ID\", \"COLUMNA\"]}"),
("{\"table\":\"TABLEA\", \"SORTORDER\": 3, \"ID\": 244, \"COLUMNA\": 200, \"COLUMNC\": \"CLOSE\", \"changed_cols\": [\"ID\", \"COLUMNA\", \"COLUMNC\"]}"),
("{\"table\":\"TABLEA\", \"SORTORDER\": 4, \"ID\": 244, \"COLUMNA\": null, \"COLUMNB\": \"user\", \"COLUMNC\": \"INTERIM\", \"COLUMND\": 35000, \"changed_cols\": [\"ID\", \"COLUMNA\", \"COLUMNB\", \"COLUMNC\", \"COLUMND\"]}"),
("{\"table\":\"TABLEA\", \"SORTORDER\": 5, \"ID\": 244, \"COLUMNB\": \"user\", \"COLUMNC\": \"OPEN\", \"changed_cols\": [\"ID\", \"COLUMNB\", \"COLUMNC\"]}"),
("{\"table\":\"TABLEA\", \"SORTORDER\": 6, \"ID\": 244, \"COLUMNB\": \"user\", \"COLUMNC\": null, \"changed_cols\": [\"ID\", \"COLUMNB\", \"COLUMNC\"]}"),
("{\"table\":\"TABLEA\", \"SORTORDER\": 1, \"ID\": 245, \"COLUMNA\": 283.7, \"changed_cols\": [\"ID\", \"COLUMNA\"]}"),
("{\"table\":\"TABLEA\", \"SORTORDER\": 2, \"ID\": 245, \"COLUMNB\": null, \"changed_cols\": [\"ID\", \"COLUMNB\"]}"),
("{\"table\":\"TABLEA\", \"SORTORDER\": 3, \"ID\": 245, \"COLUMNA\": 200, \"COLUMNC\": \"CLOSE\", \"changed_cols\": [\"ID\", \"COLUMNA\", \"COLUMNC\"]}")]

df = spark.read.json(sc.parallelize(data))

mf = df.select( 
 F.col("*"), 
 F.expr("explode(filter( changed_cols, x -> x != 'ID' ))").alias('col_changed'),  #prevent name collision
 F.map_from_entries( # create a map: string -> Column Value
  F.array(
   *[ F.struct( F.lit( col_name ).alias("Name"), F.col(col_name).cast("string") ) for col_name in df.columns ])) 
   [F.col("col_changed")].alias("value") # dereference map to create column with changed value
).select( 
 F.col("*") , 
 F.struct( #create a struct with sortorder first so we can use it to sort on.(first element in struct is compared before other items)
  F.col("SORTORDER"), 
  F.column("value").alias("value"),
  F.col("col_changed").alias("col_changed")).alias("mystruct"),  
).groupby("col_changed","ID"
).agg( F.max("mystruct").alias("mystruct"), #find the last change.
).groupby("ID"
).pivot("mystruct.col_changed"
).agg(F.first("mystruct") # there is only one value so this is safe
)
changed_columns = mf.columns # mf.columns is read-only
changed_columns.remove("ID") # ID is an int and this isn't consitent with other value and screws things up.
changed_columns2 = changed_columns # we need to arrays
mf.select( 
 F.col("ID"), # add back in ID
 F.array( *[ F.col(col_name+".col_changed") for col_name in changed_columns ] ).alias("changed_cols"), # create array of changed columns
 *[ F.col(col_name+".value").alias(col_name)   for col_name in changed_columns2 ] #short hand for writing out all columns must be last due to '*' operator
 ).select(
 F.col("ID"), 
 F.expr("filter( changed_cols , x -> x is not null)" ).alias("changed_cols"), #get rid of nulls
 *[ F.col(col_name).alias(col_name)   for col_name in changed_columns2 ] 
 ).show(100,False)
+---+------------------------------------+-------+-------+-------+-------+
|ID |changed_cols                        |COLUMNA|COLUMNB|COLUMNC|COLUMND|
+---+------------------------------------+-------+-------+-------+-------+
|244|[COLUMNA, COLUMNB, COLUMNC, COLUMND]|null   |user   |null   |35000  |
|245|[COLUMNA, COLUMNB, COLUMNC]        |200.0  |null   |CLOSE  |null   |
+---+------------------------------------+-------+-------+-------+-------+

In Depth explanation:

Here we are grabbing some columns col(*) so we can select later, and making a struct. A struct is just a column that acts like a table.(Has sub elements) this turn out to be convenient as we can pack data into it so it stays together.

.select( 
     F.col("*") , 
     F.struct( #create a struct with sortorder first so we can use it to sort on.(first element in struct is compared before other items)
      F.col("SORTORDER"), 
      F.column("value").alias("value"),
      F.col("col_changed").alias("col_changed")).alias("mystruct"),  
    ).

Ok now to get just the columns back per id we use groupby to smush this back to just the columns we have that have changed for this item. We are also going to use our struct friend. Stucts are sorted by the first "column" and then the second "column" and so on. This is convenient for us as this is the SORTORDER. So by taking the Max we are getting the last change. This is also convenient as the struct itself has the other embedded information we need for later.

groupby("col_changed","ID"
).agg( F.max("mystruct").alias("mystruct"), #find the last change.
).groupby("ID"

Now we pivot. This takes a row and turns the values into columns. The value of the columns is provided by the aggregate function that we use. Luckily for us(first("mystruct")) this is just the value of the struct. Again this will be handy later when we need to unpack the data to use it for further data munging.

    ).pivot("mystruct.col_changed"
    ).agg(F.first("mystruct") # there is only one value so this is safe
    )

Next we setup some arrays that aren't read only so we can remove "ID" it's a big int and doesn't match the values of the other struct types. This causes type mismatch issues so we just get rid of it. (Also it doesn't seem to be of value as it's always changing.)

changed_columns = mf.columns # mf.columns is read-only
changed_columns.remove("ID") # ID is an int and this isn't consitent with other value and screws things up.
changed_columns2 = changed_columns # we need to arrays

We bring Back the ID column, then make an array of changes. This is handy compressed syntax that basically takes the column names array, and turns it into an arrays of the col_changed. This is done by referencing the struct field .col_changed. (full example: COLUMNA.col_changed would get the struct in COLUMNA and reference the struct field .col_changed.

mf.select( 
 F.col("ID"), # add back in ID
 F.array( *[ F.col(col_name+".col_changed") for col_name in changed_columns ] ).alias("changed_cols"), # create array of changed columns

Here we are using the same trick be de-referncing the array. Hency the array must be last in the select. This time we are using the .value field of the struct and adding these columns to the select as var_args.

 *[ F.col(col_name+".value").alias(col_name)   for col_name in changed_columns2 ] #short hand for writing out all columns must be last due to '*' operator

Now we just filter out the numll values in the changed_cols array. We have to use an expression here to access the higher-order filter function.(in spark 3.0 this is a function we can access without using an expr), And again instead of typing out the columns names I use the same dereference of the array to pass the var_args to select.(Again must be last argument in the select)

 ).select(
 F.col("ID"), 
 F.expr("filter( changed_cols , x -> x is not null)" ).alias("changed_cols"), #get rid of nulls
 *[ F.col(col_name).alias(col_name)   for col_name in changed_columns2 ] 
 ).show(100,False)

Upvotes: 1

Related Questions