Reputation: 8711
I have a dataframe like below and want to reduce them by combining adjacent rowa i.e previous.close = current.open
val df = Seq(
("Ray","2018-09-01","2018-09-10"),
("Ray","2018-09-10","2018-09-15"),
("Ray","2018-09-16","2018-09-18"),
("Ray","2018-09-21","2018-09-27"),
("Ray","2018-09-27","2018-09-30"),
("Scott","2018-09-21","2018-09-23"),
("Scott","2018-09-24","2018-09-28"),
("Scott","2018-09-28","2018-09-30"),
("Scott","2018-10-05","2018-10-09"),
("Scott","2018-10-11","2018-10-15"),
("Scott","2018-10-15","2018-09-20")
)
The required output is below:
(("Ray","2018-09-01","2018-09-15"),
("Ray","2018-09-16","2018-09-18"),
("Ray","2018-09-21","2018-09-30"),
("Scott","2018-09-21","2018-09-23"),
("Scott","2018-09-24","2018-09-30"),
("Scott","2018-10-05","2018-10-09"),
("Scott","2018-10-11","2018-10-20"))
So, far, I'm able to condense the adjacent rows by using the below DF() solution.
df.alias("t1").join(df.alias("t2"),$"t1.name" === $"t2.name" and $"t1.close"=== $"t2.open" )
.select("t1.name","t1.open","t2.close")
.distinct.show(false)
|name |open |close |
+-----+----------+----------+
|Scott|2018-09-24|2018-09-30|
|Scott|2018-10-11|2018-09-20|
|Ray |2018-09-01|2018-09-15|
|Ray |2018-09-21|2018-09-30|
+-----+----------+----------+
I'm trying to use similar style to get single rows by giving $"t1.close"=!= $"t2.open" and then doing a union of both to get the final result. But I get unwanted rows, which I'm not able to filter correctly. How to achieve this?.
This post is not same as Spark SQL window function with complex condition where it calculates additional date column as a new column.
Upvotes: 1
Views: 489
Reputation: 380
UPDATED: the code is now tested :-)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
val df = Seq(
("Ray","2018-09-01","2018-09-10"),
("Ray","2018-09-10","2018-09-15"),
("Ray","2018-09-16","2018-09-18"),
("Ray","2018-09-21","2018-09-27"),
("Ray","2018-09-27","2018-09-30"),
("Scott","2018-09-21","2018-09-23"),
("Scott","2018-09-23","2018-09-28"), // <-- Revised
("Scott","2018-09-28","2018-09-30"),
("Scott","2018-10-05","2018-10-09"),
("Scott","2018-10-11","2018-10-15"),
("Scott","2018-10-15","2018-10-20")
).toDF("name", "open", "close")
val window = Window.partitionBy("name").orderBy($"open").rowsBetween(-1, Window.currentRow) //<- only compare the dates of a certain name, and for each row look also look at the previous one
df.select(
$"name", $"open", $"close",
min($"close").over(window) as "closeBefore_tmp"//<- get the smaller close value (that of the previous entry)
)
.withColumn("closeBefore", when($"closeBefore_tmp" === $"close", null).otherwise($"closeBefore_tmp")) //<- in this case there was no previous row: its the first for this user, so set closeBefore to null
.createOrReplaceTempView("tmp")
Now you can compare
open and closeBefore
.
Upvotes: 1
Reputation: 22449
Here's one approach:
temp1
with null
value if current open
equals previous close
; otherwise value of current open
temp2
that backfills null
s in temp1
with the last
non-null valuename
, temp2
) to generate the contiguous date rangesI've revised your sample data to cover cases of contiguous date range over 2+ rows.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = Seq(
("Ray","2018-09-01","2018-09-10"),
("Ray","2018-09-10","2018-09-15"),
("Ray","2018-09-16","2018-09-18"),
("Ray","2018-09-21","2018-09-27"),
("Ray","2018-09-27","2018-09-30"),
("Scott","2018-09-21","2018-09-23"),
("Scott","2018-09-23","2018-09-28"), // <-- Revised
("Scott","2018-09-28","2018-09-30"),
("Scott","2018-10-05","2018-10-09"),
("Scott","2018-10-11","2018-10-15"),
("Scott","2018-10-15","2018-10-20")
).toDF("name", "open", "close")
val win = Window.partitionBy($"name").orderBy("open", "close")
val df2 = df.
withColumn("temp1", when(
row_number.over(win) === 1 || lag($"close", 1).over(win) =!= $"open", $"open")
).
withColumn("temp2", last($"temp1", ignoreNulls=true).over(
win.rowsBetween(Window.unboundedPreceding, 0)
))
df2.show
// +-----+----------+----------+----------+----------+
// | name| open| close| temp1| temp2|
// +-----+----------+----------+----------+----------+
// |Scott|2018-09-21|2018-09-23|2018-09-21|2018-09-21|
// |Scott|2018-09-23|2018-09-28| null|2018-09-21|
// |Scott|2018-09-28|2018-09-30| null|2018-09-21|
// |Scott|2018-10-05|2018-10-09|2018-10-05|2018-10-05|
// |Scott|2018-10-11|2018-10-15|2018-10-11|2018-10-11|
// |Scott|2018-10-15|2018-10-20| null|2018-10-11|
// | Ray|2018-09-01|2018-09-10|2018-09-01|2018-09-01|
// | Ray|2018-09-10|2018-09-15| null|2018-09-01|
// | Ray|2018-09-16|2018-09-18|2018-09-16|2018-09-16|
// | Ray|2018-09-21|2018-09-27|2018-09-21|2018-09-21|
// | Ray|2018-09-27|2018-09-30| null|2018-09-21|
// +-----+----------+----------+----------+----------+
The above shows result of step 1
and 2
with temp2
holding the value of the earliest open
of the corresponding contiguous date range. Step 3
uses max
to get the latest close
of the date range:
df2.
groupBy($"name", $"temp2".as("open")).agg(max($"close").as("close")).
show
// +-----+----------+----------+
// |name |open |close |
// +-----+----------+----------+
// |Scott|2018-09-21|2018-09-30|
// |Scott|2018-10-05|2018-10-09|
// |Scott|2018-10-11|2018-10-20|
// |Ray |2018-09-01|2018-09-15|
// |Ray |2018-09-16|2018-09-18|
// |Ray |2018-09-21|2018-09-30|
// +-----+----------+----------+
Upvotes: 2