Reputation: 1092
I need to perform grouping operation on a DataFrame
and perform some manipulation at Row
level and finally create a DataFrame
.
Input table :
inputDF.show()
+------------+-----------------+--------------------+
|ckey |twkey | s_date |
+------------+-----------------+--------------------+
| 99227100| 42222245|2018-04-26 |
| 99227100| 42222245|2018-05-01 |
| 34011381| 42830849|2015-12-20 |
| 34011381| 42830849|2016-11-27 |
| 34011381| 42830849|2016-12-19 |
| 34011381| 42830849|2017-08-05 |
+------------+-----------------+--------------------+
here is the code I have tried but getting a empty dataframe
def encoder(columns: Seq[String]): Encoder[Row] = RowEncoder(StructType(columns.map(StructField(_, StringType, nullable = true))))
val outputCols = Seq("ckey","twkey","s_date")
val result = inputDF.groupByKey(_.get("ckey"))
.flatMapGroups((_, rowsForEach) => {
var list1:List[Row] = List()
var list2:List[Row] = List()
for (elem <- rowsForEach) {
val newRow = elem
println(newRow.getAs[String]("ckey"))
//if(list1.isEmpty) /*some more conditions to check*/ list2 :+ newRow
//else /*some more conditions to check*/ list1 :+ newRow
list1 :+ newRow
}
list1
})(encoder(outputCols)).toDF
result.sort("ckey","twkey","s_date").show()
+------------+-----------------+--------------------+
|ckey |twkey | s_date |
+------------+-----------------+--------------------+
+------------+-----------------+--------------------+
Why is the list1 : List[Row]
is getting empty after end of the loop ?
Expected Result:
Since I have not implemented any if condition the expected output should be the same as inputDF
result.show()
+------------+-----------------+--------------------+
|ckey |twkey | s_date |
+------------+-----------------+--------------------+
| 99227100| 42222245|2018-04-26 |
| 99227100| 42222245|2018-05-01 |
| 34011381| 42830849|2015-12-20 |
| 34011381| 42830849|2016-11-27 |
| 34011381| 42830849|2016-12-19 |
| 34011381| 42830849|2017-08-05 |
+------------+-----------------+--------------------+
Upvotes: 1
Views: 4236
Reputation: 377
You can try this below code:
val spark = SparkSession.builder
.master("local[*]")
.appName("testApp")
.config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse")
.getOrCreate()
import spark.implicits._
val ds = Seq(("99227100", "42222245", "2018-04-26"),
("99227100", "42222245", "2018-05-01"),
("34011381", "42830849", "2015-12-20"),
("34011381", "42830849", "2016-11-27"),
("34011381", "42830849", "2016-12-19"),
("34011381", "42830849", "2017-08-05")).toDS()
//.toDF("ckey", "twkey", "s_date")
ds.show()
def encoder(columns: Seq[String]): Encoder[Row] = RowEncoder(StructType(columns.map(StructField(_, StringType, nullable = false))))
val outputCols = Seq("ckey", "twkey", "s_date")
val result = ds.groupByKey(_._1)
.flatMapGroups((_, rowsForEach) => {
val list1 = scala.collection.mutable.ListBuffer[Row]()
for (elem <- rowsForEach) {
list1.append(Row(elem._1, elem._2, elem._3))
}
list1
})(encoder(outputCols)).toDF
result.show()
Output:
+--------+--------+----------+
| ckey| twkey| s_date|
+--------+--------+----------+
|34011381|42830849|2015-12-20|
|34011381|42830849|2016-11-27|
|34011381|42830849|2016-12-19|
|34011381|42830849|2017-08-05|
|99227100|42222245|2018-04-26|
|99227100|42222245|2018-05-01|
+--------+--------+----------+
Upvotes: 4
Reputation: 870
You're using an immutable List, += returns a new list with the added parameter (which you're ignoring). Instead, use:
new scala.collection.mutable.ListBuffer[T]
Upvotes: 1
Reputation: 35229
TL;DR You can use List
and var
like this.
Explanation:
Let's look at this parts of the code:
var list1: List[Row] = List()
...
list1 :+ newRow
Scala List
is immutable. var
denotes a variable. It means that the reference can resigned.
list1 = Nil
or
list1 = List(Row(42))
but it doesn't affect mutability of list. Every time you call
list1 :+ newRow
you create a new list and discard the result.
To store anything you should you should reassign the result:
list1 = list1 :+ newRow
but you don't really want to append to List
in a loop. Prepending would be better
list1 = newRow :: list1
but in practice just one of the collections.mutable
.
Note:
Also we don't really use groupByKey
with Dataset[Row]
. Most of the time there are more efficient ways of dealing with it, but it is different problem.
Upvotes: 3