wushi_finger
wushi_finger

Reputation: 31

Spark: Replicate each row but with change in one column value

How to perform the following operation in spark,

Initially:
+-----------+-----+------+
|date       |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2   | abc  |
|2020-08-17 | 3   | def  |
|2020-08-18 | 4   | ghi  |
|2020-08-19 | 5   | jkl  |
|2020-08-20 | 6   | mno  |
+-----------+-----+------+

Final result:
+-----------+-----+------+
|date       |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2   | abc  |
|2020-08-15 | 2   | abc  |
|2020-08-17 | 3   | def  |
|2020-08-16 | 3   | def  |
|2020-08-18 | 4   | ghi  |
|2020-08-17 | 4   | ghi  |
|2020-08-19 | 5   | jkl  |
|2020-08-18 | 5   | jkl  |
|2020-08-20 | 6   | mno  |
|2020-08-19 | 6   | mno  |
+-----------+-----+------+

So in essence need to duplicate each row with a change in one of the column values i.e. for each row, duplicate with date column as minus 1 day of current value.

Upvotes: 3

Views: 1891

Answers (3)

Muhammed Saed
Muhammed Saed

Reputation: 41

Maybe a bit late for this but answering this on python so others might find it useful.

from pyspark.sql.functions import *

Initial DF looks like this:

+-----------+-----+------+
|date       |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2   | abc  |
|2020-08-17 | 3   | def  |
|2020-08-18 | 4   | ghi  |
|2020-08-19 | 5   | jkl  |
|2020-08-20 | 6   | mno  |
+-----------+-----+------+

df.withColumn("dates_array",array(col("date"),date_add(col("date"),-1))))
.drop("date")
.withColumn("date",explode("dates_array"))
.drop("dates_array")
.show()

Then you'll get what you want:

+-----------+-----+------+
|date       |col1 | col2 |
+-----------+-----+------+
|2020-08-16 | 2   | abc  |
|2020-08-15 | 2   | abc  |
|2020-08-17 | 3   | def  |
|2020-08-16 | 3   | def  |
|2020-08-18 | 4   | ghi  |
|2020-08-17 | 4   | ghi  |
|2020-08-19 | 5   | jkl  |
|2020-08-18 | 5   | jkl  |
|2020-08-20 | 6   | mno  |
|2020-08-19 | 6   | mno  |
+-----------+-----+------+

Upvotes: 0

wBob
wBob

Reputation: 14379

I was thinking union would be quite elegant for this solution, eg

// Union the two dataframes together, take 1 day away from the date
df.union(df.select(date_add($"date", -1), $"col1", $"col2"))

Full sample script where I create the test data:

import org.apache.spark.sql.functions._

val dfOriginal = Seq(("2020-08-16", 2, "abc"), ("2020-08-17", 3, "def"), ("2020-08-18", 4, "ghi"), ("2020-08-19", 5, "jkl"), ("2020-08-20", 6, "mno"))
  .toDF("date", "col1", "col2")

val df = dfOriginal
  .select (to_date($"date", "yyyy-MM-dd").as("date"), $"col1", $"col2")

// Union the two dataframes together, take 1 day away from the date
df.union(df.select(date_add($"date", -1), $"col1", $"col2"))
  .orderBy("date", "col1", "col2")
  .show

My results:

My results

Upvotes: 0

notNull
notNull

Reputation: 31470

Try with date_add function then create array with date column and date-1 column and finally explode the column.

Example:

df.show()

/*
+----------+----+----+
|      date|col1|col2|
+----------+----+----+
|2020-08-16|   2| abc|
|2020-08-17|   3| def|
+----------+----+----+
*/

import org.apache.spark.sql.functions._

df.withColumn("new_date",array(col("date"),date_add(col("date"),-1))).
drop("date").
selectExpr("explode(new_date) as date","*").
drop("new_date").
show(10,false)

/*
+----------+----+----+
|date      |col1|col2|
+----------+----+----+
|2020-08-16|2   |abc |
|2020-08-15|2   |abc |
|2020-08-17|3   |def |
|2020-08-16|3   |def |
+----------+----+----+
*/

Upvotes: 2

Related Questions