Reputation: 31
How to perform the following operation in spark,
Initially:
+-----------+-----+------+
|date |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2 | abc |
|2020-08-17 | 3 | def |
|2020-08-18 | 4 | ghi |
|2020-08-19 | 5 | jkl |
|2020-08-20 | 6 | mno |
+-----------+-----+------+
Final result:
+-----------+-----+------+
|date |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2 | abc |
|2020-08-15 | 2 | abc |
|2020-08-17 | 3 | def |
|2020-08-16 | 3 | def |
|2020-08-18 | 4 | ghi |
|2020-08-17 | 4 | ghi |
|2020-08-19 | 5 | jkl |
|2020-08-18 | 5 | jkl |
|2020-08-20 | 6 | mno |
|2020-08-19 | 6 | mno |
+-----------+-----+------+
So in essence need to duplicate each row with a change in one of the column values i.e. for each row, duplicate with date column as minus 1 day of current value.
Upvotes: 3
Views: 1891
Reputation: 41
Maybe a bit late for this but answering this on python so others might find it useful.
from pyspark.sql.functions import *
Initial DF looks like this:
+-----------+-----+------+
|date |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2 | abc |
|2020-08-17 | 3 | def |
|2020-08-18 | 4 | ghi |
|2020-08-19 | 5 | jkl |
|2020-08-20 | 6 | mno |
+-----------+-----+------+
df.withColumn("dates_array",array(col("date"),date_add(col("date"),-1))))
.drop("date")
.withColumn("date",explode("dates_array"))
.drop("dates_array")
.show()
Then you'll get what you want:
+-----------+-----+------+
|date |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2 | abc |
|2020-08-15 | 2 | abc |
|2020-08-17 | 3 | def |
|2020-08-16 | 3 | def |
|2020-08-18 | 4 | ghi |
|2020-08-17 | 4 | ghi |
|2020-08-19 | 5 | jkl |
|2020-08-18 | 5 | jkl |
|2020-08-20 | 6 | mno |
|2020-08-19 | 6 | mno |
+-----------+-----+------+
Upvotes: 0
Reputation: 14379
I was thinking union
would be quite elegant for this solution, eg
// Union the two dataframes together, take 1 day away from the date
df.union(df.select(date_add($"date", -1), $"col1", $"col2"))
Full sample script where I create the test data:
import org.apache.spark.sql.functions._
val dfOriginal = Seq(("2020-08-16", 2, "abc"), ("2020-08-17", 3, "def"), ("2020-08-18", 4, "ghi"), ("2020-08-19", 5, "jkl"), ("2020-08-20", 6, "mno"))
.toDF("date", "col1", "col2")
val df = dfOriginal
.select (to_date($"date", "yyyy-MM-dd").as("date"), $"col1", $"col2")
// Union the two dataframes together, take 1 day away from the date
df.union(df.select(date_add($"date", -1), $"col1", $"col2"))
.orderBy("date", "col1", "col2")
.show
My results:
Upvotes: 0
Reputation: 31470
Try with date_add
function then create array with date column and date-1 column and finally explode the column.
Example:
df.show()
/*
+----------+----+----+
| date|col1|col2|
+----------+----+----+
|2020-08-16| 2| abc|
|2020-08-17| 3| def|
+----------+----+----+
*/
import org.apache.spark.sql.functions._
df.withColumn("new_date",array(col("date"),date_add(col("date"),-1))).
drop("date").
selectExpr("explode(new_date) as date","*").
drop("new_date").
show(10,false)
/*
+----------+----+----+
|date |col1|col2|
+----------+----+----+
|2020-08-16|2 |abc |
|2020-08-15|2 |abc |
|2020-08-17|3 |def |
|2020-08-16|3 |def |
+----------+----+----+
*/
Upvotes: 2