Reputation: 509
I have a dataset with date,accountid and value. I want to transform the dataset to a new dataset where if accountid is not present in a particular date then add a accountid with value of 0 against that date.Is this possible
val df = sc.parallelize(Seq(("2018-01-01", 100.5,"id1"),
("2018-01-02", 120.6,"id1"),
("2018-01-03", 450.2,"id2")
)).toDF("date", "val","accountid")
+----------+-----+---------+
| date| val|accountid|
+----------+-----+---------+
|2018-01-01|100.5| id1|
|2018-01-02|120.6| id1|
|2018-01-03|450.2| id2|
+----------+-----+---------+
I want to transform this dataset into this format
+----------+-----+---------+
| date| val|accountid|
+----------+-----+---------+
|2018-01-01|100.5| id1|
|2018-01-01| 0.0| id2|
|2018-01-02|120.6| id1|
|2018-01-02| 0.0| id2|
|2018-01-03|450.2| id2|
|2018-01-03|0.0 | id1|
+----------+-----+---------+
Upvotes: 0
Views: 90
Reputation: 41957
You can simply use a udf
function to get your requirement fulfilled.
But before that you will have to get the complete set of accountids and get it broadcasted to be used in udf
function.
The returned array from udf
function is to be exploded and finally select the columns.
import org.apache.spark.sql.functions._
val idList = df.select(collect_set("accountid")).first().getAs[Seq[String]](0)
val broadCastedIdList = sc.broadcast(idList)
def populateUdf = udf((date: String, value: Double, accountid: String)=> Array(accounts(date, value, accountid)) ++ broadCastedIdList.value.filterNot(_ == accountid).map(accounts(date, 0.0, _)))
df.select(populateUdf(col("date"), col("val"), col("accountid")).as("struct"))
.withColumn("struct", explode(col("struct")))
.select(col("struct.date"), col("struct.value").as("val"), col("struct.accountid"))
.show(false)
And of course you would need a case class
case class accounts(date:String, value:Double, accountid:String)
which should give you
+----------+-----+---------+
|date |val |accountid|
+----------+-----+---------+
|2018-01-01|100.5|id1 |
|2018-01-01|0.0 |id2 |
|2018-01-02|120.6|id1 |
|2018-01-02|0.0 |id2 |
|2018-01-03|450.2|id2 |
|2018-01-03|0.0 |id1 |
+----------+-----+---------+
Note: value keyword is used in case class because reserved identifier names cannot be used as variable names
Upvotes: 1
Reputation: 35229
You can create reference
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
val Row(minTs: Long, maxTs: Long) = df
.select(to_date($"date").cast("timestamp").cast("bigint") as "date")
.select(min($"date"), max($"date")).first
val by = 60 * 60 * 24
val ref = spark
.range(minTs, maxTs + by, by)
.select($"id".cast("timestamp").cast("date").cast("string").as("date"))
.crossJoin(df.select("accountid").distinct)
and outer join with input data:
ref.join(df, Seq("date", "accountid"), "leftouter").na.fill(0.0).show
// +----------+---------+-----+
// | date|accountid| val|
// +----------+---------+-----+
// |2018-01-03| id1| 0.0|
// |2018-01-01| id1|100.5|
// |2018-01-02| id2| 0.0|
// |2018-01-02| id1|120.6|
// |2018-01-03| id2|450.2|
// |2018-01-01| id2| 0.0|
// +----------+---------+-----+
Concept adopted from this sparklyr
answer by user6910411.
Upvotes: 0