Masterbuilder
Masterbuilder

Reputation: 509

Transform dataset with empty data for dates

I have a dataset with date,accountid and value. I want to transform the dataset to a new dataset where if accountid is not present in a particular date then add a accountid with value of 0 against that date.Is this possible

    val df = sc.parallelize(Seq(("2018-01-01", 100.5,"id1"),
  ("2018-01-02", 120.6,"id1"),
  ("2018-01-03", 450.2,"id2")
  )).toDF("date", "val","accountid")
    +----------+-----+---------+
|      date|  val|accountid|
+----------+-----+---------+
|2018-01-01|100.5|      id1|
|2018-01-02|120.6|      id1|
|2018-01-03|450.2|      id2|
+----------+-----+---------+

I want to transform this dataset into this format

+----------+-----+---------+
|      date|  val|accountid|
+----------+-----+---------+
|2018-01-01|100.5|      id1|
|2018-01-01|  0.0|      id2|
|2018-01-02|120.6|      id1|
|2018-01-02|  0.0|      id2|
|2018-01-03|450.2|      id2|
|2018-01-03|0.0  |      id1|
+----------+-----+---------+

Upvotes: 0

Views: 90

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

You can simply use a udf function to get your requirement fulfilled.

But before that you will have to get the complete set of accountids and get it broadcasted to be used in udf function.

The returned array from udf function is to be exploded and finally select the columns.

import org.apache.spark.sql.functions._
val idList = df.select(collect_set("accountid")).first().getAs[Seq[String]](0)

val broadCastedIdList = sc.broadcast(idList)

def populateUdf = udf((date: String, value: Double, accountid: String)=> Array(accounts(date, value, accountid)) ++ broadCastedIdList.value.filterNot(_ == accountid).map(accounts(date, 0.0, _)))

df.select(populateUdf(col("date"), col("val"), col("accountid")).as("struct"))
    .withColumn("struct", explode(col("struct")))
    .select(col("struct.date"), col("struct.value").as("val"), col("struct.accountid"))
  .show(false)

And of course you would need a case class

case class accounts(date:String, value:Double, accountid:String)

which should give you

+----------+-----+---------+
|date      |val  |accountid|
+----------+-----+---------+
|2018-01-01|100.5|id1      |
|2018-01-01|0.0  |id2      |
|2018-01-02|120.6|id1      |
|2018-01-02|0.0  |id2      |
|2018-01-03|450.2|id2      |
|2018-01-03|0.0  |id1      |
+----------+-----+---------+

Note: value keyword is used in case class because reserved identifier names cannot be used as variable names

Upvotes: 1

Alper t. Turker
Alper t. Turker

Reputation: 35229

You can create reference

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row

val Row(minTs: Long, maxTs: Long) = df
  .select(to_date($"date").cast("timestamp").cast("bigint") as "date")
  .select(min($"date"), max($"date")).first

val by =  60 * 60 * 24

val ref = spark
  .range(minTs, maxTs + by, by)
  .select($"id".cast("timestamp").cast("date").cast("string").as("date"))
  .crossJoin(df.select("accountid").distinct)

and outer join with input data:

ref.join(df, Seq("date", "accountid"), "leftouter").na.fill(0.0).show
// +----------+---------+-----+      
// |      date|accountid|  val|
// +----------+---------+-----+
// |2018-01-03|      id1|  0.0|
// |2018-01-01|      id1|100.5|
// |2018-01-02|      id2|  0.0|
// |2018-01-02|      id1|120.6|
// |2018-01-03|      id2|450.2|
// |2018-01-01|      id2|  0.0|
// +----------+---------+-----+

Concept adopted from this sparklyr answer by user6910411.

Upvotes: 0

Related Questions