Reputation: 135
Using Databricks, Spark 3.0.1
To use the legacy format, I have set: spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
I have a dataframe similar to the sample below.
Each row needs to be split in several rows based on a change in consecutive values. The other columns can be filled with null.
Sample:
+----+----+----+----+----+----+----+----+----+----+
|id |t.n1|t.n2|t.n3|t.n4|t.n5|t.n6|t.n7|t.n8|t.n9|
+----+----+----+----+----+----+----+----+----+----+
|1 |100 |100 |100 |500 |500 |500 |200 |200 |200 |
|2 |100 |100 |700 |700 |700 |100 |100 |100 |100 |
+----+----+----+----+----+----+----+----+----+----+
Expected Output:
+----+----+----+----+----+----+----+----+----+----+
|id |t.n1|t.n2|t.n3|t.n4|t.n5|t.n6|t.n7|t.n8|t.n9|
+----+----+----+----+----+----+----+----+----+----+
|1 |100 |100 |100 |Nan |Nan |Nan |Nan |Nan |Nan |
|2 |Nan |Nan |Nan |500 |500 |500 |Nan |Nan |Nan |
|3 |Nan |Nan |Nan |Nan |Nan |Nan |200 |200 |200 |
|4 |100 |100 |Nan |Nan |Nan |Nan |Nan |Nan |Nan |
|5 |Nan |Nan |700 |700 |700 |Nan |Nan |Nan |Nan |
|6 |Nan |Nan |Nan |Nan |Nan |100 |100 |100 |100 |
+----+----+----+----+----+----+----+----+----+----+
Upvotes: 4
Views: 599
Reputation: 18098
Some good old-fashioned coding. Along with a few learnable items for Scala.
Invariably the Any aspects reared its ugly head so I used Option approach. Note toString() required and a particular way of building List dynamically.
Is it functional programming? Less so than the other solution, but was not sure how to do with fold, etc. with nested structures. Intermediate solution this then, that may serve yu well with mapPartitions.
Here is the code:
import spark.implicits._
import org.apache.spark.sql.functions._
import scala.util.Try
def tryToInt( s: String ) = Try(s.toInt).toOption
def lp( a: List[Int] ) :List[List[Option[Int]]] = {
var cnt:Int = 0
var tempAA: List[List[Int]] = List()
var tempA: List[Int] = List()
var c:Int = 0
val sInput = a.size
// Does not work for empty List, but will not be empty
for (v <- a) {
if (cnt > 0 ) { if (v != c)
{ tempAA = tempAA :+ tempA
tempA = List()
}
}
c = v
cnt +=1
tempA = tempA :+ v
}
tempAA = tempAA :+ tempA
val numItems = tempAA.map(x => x.size) // List of occurrences per slot
val cumCount = numItems.scanLeft(0)(_ + _).tail // Cumulative count
var res: List[List[Option[Int]]] = List()
var tempAAA: List[List[String]] = List()
for (i <- 0 until numItems.length) {
val itemsLeft = cumCount(i) - numItems(i)
val itemsRight = sInput - cumCount(i)
val left = List.fill(itemsLeft)(None)
val right = List.fill(itemsRight)(None)
tempAAA = List()
tempAAA = tempAAA :+ left.map(_.toString())
tempAAA = tempAAA :+ tempAA(i).map(_.toString())
tempAAA = tempAAA :+ right.map(_.toString())
val tempAAAA = tempAAA.flatten.map(_.toString()).map(x => tryToInt(x))
res = res :+ tempAAAA
}
return res
}
val dataIn = Seq((1,2,2,3,5,5,5,5,5), (4,2,2,2,5,5,5,5,5), (5,5,5,5,5,5,5,5,5)).toDS()
val data = dataIn.withColumn("input", array(dataIn.columns.map(col): _*)).select($"input").as[List[Int]]
val df = data.rdd.map(lp).toDF().select(explode($"value"))
val n = dataIn.columns.size
df.select( (0 until n).map(i => col("col")(i).alias(s"c${i+1}")): _*).show(false)
returns:
+----+----+----+----+----+----+----+----+----+
|c1 |c2 |c3 |c4 |c5 |c6 |c7 |c8 |c9 |
+----+----+----+----+----+----+----+----+----+
|1 |null|null|null|null|null|null|null|null|
|null|2 |2 |null|null|null|null|null|null|
|null|null|null|3 |null|null|null|null|null|
|null|null|null|null|5 |5 |5 |5 |5 |
|4 |null|null|null|null|null|null|null|null|
|null|2 |2 |2 |null|null|null|null|null|
|null|null|null|null|5 |5 |5 |5 |5 |
|5 |5 |5 |5 |5 |5 |5 |5 |5 |
+----+----+----+----+----+----+----+----+----+
Upvotes: 1
Reputation: 27373
given this dataframe :
+---+---+---+---+---+---+---+---+---+---+
| id| n1| n2| n3| n4| n5| n6| n7| n8| n9|
+---+---+---+---+---+---+---+---+---+---+
| 1|100|100|100|500|500|500|200|200|200|
| 2|100|100|700|700|700|100|100|100|100|
+---+---+---+---+---+---+---+---+---+---+
I came up with a solution based in a mixture between dataframes and datasets:
val l = 9 // number of cols
df
// put values into array
.select($"id", array(df.columns.tail.map(col): _*).as("col"))
// switch to dataset api
.as[(Int, Seq[Int])]
.flatMap { case (id, arr) => {
val arrI = arr.zipWithIndex
// split list in sublist based on adjacent values
arrI.tail
.foldLeft(Seq(Seq(arrI.head)))((acc, curr) =>
if (acc.last.last._1 == curr._1) {
acc.init :+ (acc.last :+ curr)
} else {
acc :+ Seq(curr)
}
)
// aggregate sublists into value, from, to
.map(chunk => (chunk.head._1, chunk.map(_._2).min, chunk.map(_._2).max))
// generate new lists, fill with Nones
.zipWithIndex
.map { case ((num, from, to),subI) => (id,subI+1,(0 until l).map(i=> if(i>=from && i<=to) Some(num) else None))}
}
}
.toDF("id","sub_id","values") // back to dataframe api
// rename columns
.select($"id"+:$"sub_id"+:(0 until l).map(i => $"values"(i).as(s"n${i+1}")):_*)
.show(false)
which yields:
+---+------+----+----+----+----+----+----+----+----+----+
|id |sub_id|n1 |n2 |n3 |n4 |n5 |n6 |n7 |n8 |n9 |
+---+------+----+----+----+----+----+----+----+----+----+
|1 |1 |100 |100 |100 |null|null|null|null|null|null|
|1 |2 |null|null|null|500 |500 |500 |null|null|null|
|1 |3 |null|null|null|null|null|null|200 |200 |200 |
|2 |1 |100 |100 |null|null|null|null|null|null|null|
|2 |2 |null|null|700 |700 |700 |null|null|null|null|
|2 |3 |null|null|null|null|null|100 |100 |100 |100 |
+---+------+----+----+----+----+----+----+----+----+----+
As you can see I was not yet successful to get the correct id, this would need some more work. The problem is to make a subsequent id, this would need a wide transformation (window-function without partitioning) which would lead to a performance-bottleneck
Upvotes: 2