Reputation: 13
I have a table with a lot of records (6+ million) but most of the rows per ID are all the same.
Example:
Row | Date | ID | Col1 | Col2 | Col3 | Col4 | Col5 |
---|---|---|---|---|---|---|---|
1 | 01-01-2021 | 1 | a | b | c | d | e |
2 | 02-01-2021 | 1 | a | b | c | d | x |
3 | 03-01-2021 | 1 | a | b | c | d | x |
4 | 04-01-2021 | 1 | a | b | c | d | x |
5 | 01-01-2021 | 2 | a | b | c | d | e |
6 | 02-01-2021 | 2 | a | b | x | d | e |
7 | 03-01-2021 | 2 | a | b | x | d | e |
8 | 01-01-2021 | 3 | a | b | c | d | e |
9 | 02-01-2021 | 3 | a | b | c | d | e |
10 | 03-01-2021 | 3 | a | b | c | d | e |
To save space but also make querying easier I want to create a table where only rows are shown if there was a change vs the previous row (except for the date).
For above table this means I would only like to see:
So the table would look like this:
Row | Date | ID | Col1 | Col2 | Col3 | Col4 | Col5 |
---|---|---|---|---|---|---|---|
1 | 01-01-2021 | 1 | a | b | c | d | e |
2 | 02-01-2021 | 1 | a | b | c | d | x |
5 | 01-01-2021 | 2 | a | b | c | d | e |
6 | 02-01-2021 | 2 | a | b | x | d | e |
8 | 01-01-2021 | 3 | a | b | c | d | e |
I am trying to do this in PySpark and have had some success with 1 column only using LAG
but having some trouble when there are more columns (there are about 20 in my own table). I would prefer to do this in PySpark but a working version in SQL or Python could also work!
I was wondering if there are better ways to do this.
Upvotes: 0
Views: 149
Reputation: 76
if there is the column "Row" in your table you can do it fast in SQL:
SELECT t1.*
FROM MyTable t1
INNER JOIN MyTable t2 ON t1.Row=t2.Row-1
WHERE t1.ID<>t2.ID OR t1.Col1<>t2.Col1 OR t1.Col2<>t2.Col2 OR t1.Col3<>t2.Col3 OR t1.Col4<>t2.Col4 OR t1.Col5<>t2.Col5
Upvotes: 0
Reputation: 42352
You can create a lagged array column of all columns of interest and compare it to the current row, then do a filter:
from pyspark.sql import functions as F, Window
cols = df.columns[3:]
w = Window.partitionBy('ID').orderBy('Date')
df2 = df.withColumn(
'diff',
F.coalesce(
F.lag(F.array(*cols)).over(w) != F.array(*cols),
F.lit(True) # take care of first row where the lag is null
)
).filter('diff').drop('diff')
df2.show()
+---+----------+---+----+----+----+----+----+
|Row| Date| ID|Col1|Col2|Col3|Col4|Col5|
+---+----------+---+----+----+----+----+----+
| 1|01-01-2021| 1| a| b| c| d| e|
| 2|02-01-2021| 1| a| b| c| d| x|
| 5|01-01-2021| 2| a| b| c| d| e|
| 6|02-01-2021| 2| a| b| x| d| e|
| 8|01-01-2021| 3| a| b| c| d| e|
+---+----------+---+----+----+----+----+----+
Upvotes: 1