RaAm
RaAm

Reputation: 1092

Spark Scala groupByKey and flatMapGroups give empty dataframe

I need to perform grouping operation on a DataFrame and perform some manipulation at Row level and finally create a DataFrame.

Input table :

inputDF.show()
+------------+-----------------+--------------------+
|ckey        |twkey            | s_date             |
+------------+-----------------+--------------------+
|    99227100|         42222245|2018-04-26          |
|    99227100|         42222245|2018-05-01          |
|    34011381|         42830849|2015-12-20          |
|    34011381|         42830849|2016-11-27          |
|    34011381|         42830849|2016-12-19          |
|    34011381|         42830849|2017-08-05          |
+------------+-----------------+--------------------+

here is the code I have tried but getting a empty dataframe

def encoder(columns: Seq[String]): Encoder[Row] = RowEncoder(StructType(columns.map(StructField(_, StringType, nullable = true))))


val outputCols = Seq("ckey","twkey","s_date")


val result = inputDF.groupByKey(_.get("ckey"))
  .flatMapGroups((_, rowsForEach) => {

    var list1:List[Row] = List()
    var list2:List[Row] = List()

    for (elem <- rowsForEach) {
      val newRow = elem
      println(newRow.getAs[String]("ckey"))
      //if(list1.isEmpty) /*some more conditions to check*/ list2 :+ newRow
      //else /*some more conditions to check*/ list1 :+ newRow
      list1 :+ newRow
    }
    list1
  })(encoder(outputCols)).toDF

result.sort("ckey","twkey","s_date").show()
+------------+-----------------+--------------------+
|ckey        |twkey            | s_date             |
+------------+-----------------+--------------------+
+------------+-----------------+--------------------+

Why is the list1 : List[Row] is getting empty after end of the loop ?

Expected Result:

Since I have not implemented any if condition the expected output should be the same as inputDF

result.show()
+------------+-----------------+--------------------+
|ckey        |twkey            | s_date             |
+------------+-----------------+--------------------+
|    99227100|         42222245|2018-04-26          |
|    99227100|         42222245|2018-05-01          |
|    34011381|         42830849|2015-12-20          |
|    34011381|         42830849|2016-11-27          |
|    34011381|         42830849|2016-12-19          |
|    34011381|         42830849|2017-08-05          |
+------------+-----------------+--------------------+

Upvotes: 1

Views: 4236

Answers (3)

Souvik
Souvik

Reputation: 377

You can try this below code:

  val spark = SparkSession.builder
  .master("local[*]")
  .appName("testApp")
  .config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse")
  .getOrCreate()

import spark.implicits._

val ds = Seq(("99227100", "42222245", "2018-04-26"),
  ("99227100", "42222245", "2018-05-01"),
  ("34011381", "42830849", "2015-12-20"),
  ("34011381", "42830849", "2016-11-27"),
  ("34011381", "42830849", "2016-12-19"),
  ("34011381", "42830849", "2017-08-05")).toDS()

//.toDF("ckey", "twkey", "s_date")

ds.show()

def encoder(columns: Seq[String]): Encoder[Row] = RowEncoder(StructType(columns.map(StructField(_, StringType, nullable = false))))

val outputCols = Seq("ckey", "twkey", "s_date")

val result = ds.groupByKey(_._1)
  .flatMapGroups((_, rowsForEach) => {
    val list1 = scala.collection.mutable.ListBuffer[Row]()
    for (elem <- rowsForEach) {
      list1.append(Row(elem._1, elem._2, elem._3))
    }
    list1
  })(encoder(outputCols)).toDF

result.show()

Output:

+--------+--------+----------+
|    ckey|   twkey|    s_date|
+--------+--------+----------+
|34011381|42830849|2015-12-20|
|34011381|42830849|2016-11-27|
|34011381|42830849|2016-12-19|
|34011381|42830849|2017-08-05|
|99227100|42222245|2018-04-26|
|99227100|42222245|2018-05-01|
+--------+--------+----------+

Upvotes: 4

Nir Hedvat
Nir Hedvat

Reputation: 870

You're using an immutable List, += returns a new list with the added parameter (which you're ignoring). Instead, use:

new scala.collection.mutable.ListBuffer[T]

Upvotes: 1

Alper t. Turker
Alper t. Turker

Reputation: 35229

TL;DR You can use List and var like this.

Explanation:

Let's look at this parts of the code:

var list1: List[Row] = List()
...
list1 :+ newRow

Scala List is immutable. var denotes a variable. It means that the reference can resigned.

list1 = Nil

or

list1 = List(Row(42))

but it doesn't affect mutability of list. Every time you call

list1 :+ newRow

you create a new list and discard the result.

To store anything you should you should reassign the result:

list1 = list1 :+ newRow

but you don't really want to append to List in a loop. Prepending would be better

list1 = newRow :: list1

but in practice just one of the collections.mutable.

Note:

Also we don't really use groupByKey with Dataset[Row]. Most of the time there are more efficient ways of dealing with it, but it is different problem.

Upvotes: 3

Related Questions