bytecode
bytecode

Reputation: 71

Merge Spark dataframe rows based on key column in Scala

I have a streaming Dataframe with 2 columns. A key column represented as String and an objects column which is an array containing one object element. I want to be able to merge records or rows in the Dataframe with the same key such that the merged records form an array of objects.

Dataframe

----------------------------------------------------------------
|key    | objects                                              |
----------------------------------------------------------------
|abc    | [{"name": "file", "type": "sample", "code": "123"}]  |
|abc    | [{"name": "image", "type": "sample", "code": "456"}] |
|xyz    | [{"name": "doc", "type": "sample", "code": "707"}]   |
----------------------------------------------------------------


Merged Dataframe

-------------------------------------------------------------------------
|key   |  objects                                                        |
-------------------------------------------------------------------------
|abc   |    [{"name": "file", "type": "sample", "code": "123"}, {"name": 
            "image", "type": "sample", "code": "456"}]                   |
|xyz   |   [{"name": "doc", "type": "sample", "code": "707"}]            |
--------------------------------------------------------------------------

One option to do this to convert this into a PairedRDD and apply the reduceByKey function, but I'd prefer to do this with Dataframes if possible since it'd more optimal. Is there any way to do this with Dataframes without compromising on performance?

Upvotes: 0

Views: 2029

Answers (1)

Leo C
Leo C

Reputation: 22449

Assuming column objects is an array of a single JSON string, here's how you can merge objects by key:

import org.apache.spark.sql.functions._

case class Obj(name: String, `type`: String, code: String)

val df = Seq(
    ("abc", Obj("file", "sample", "123")),
    ("abc", Obj("image", "sample", "456")),
    ("xyz", Obj("doc", "sample", "707"))
  ).
  toDF("key", "object").
  select($"key", array(to_json($"object")).as("objects"))

df.show(false)
// +---+-----------------------------------------------+
// |key|objects                                        |
// +---+-----------------------------------------------+
// |abc|[{"name":"file","type":"sample","code":"123"}] |
// |abc|[{"name":"image","type":"sample","code":"456"}]|
// |xyz|[{"name":"doc","type":"sample","code":"707"}]  |
// +---+-----------------------------------------------+

df.groupBy($"key").agg(collect_list($"objects"(0)).as("objects")).
  show(false)
// +---+---------------------------------------------------------------------------------------------+
// |key|objects                                                                                      |
// +---+---------------------------------------------------------------------------------------------+
// |xyz|[{"name":"doc","type":"sample","code":"707"}]                                                |
// |abc|[{"name":"file","type":"sample","code":"123"}, {"name":"image","type":"sample","code":"456"}]|
// +---+---------------------------------------------------------------------------------------------+

Upvotes: 1

Related Questions