ilyazakharov
ilyazakharov

Reputation: 288

How to join using a nested column in Spark dataframe

I have one dataframe with this schema:

|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Agreement_A1: string (nullable = true)
|    |    |-- Agreement_A2: string (nullable = true)

data:

+-----------+-----------+--------------------------------------------------+
|Activity_A1|Activity_A2|Details                                           |
+-----------+-----------+--------------------------------------------------+
|Act1_Attr1 |Act1_Attr2 |[[Agr2_Attr1,Agr2_Attr2], [Agr1_Attr1,Agr1_Attr2]]|
|Act2_Attr1 |Act2_Attr2 |[[Agr4_Attr1,Agr4_Attr2], [Agr3_Attr1,Agr3_Attr2]]|
|Act3_Attr1 |Act3_Attr2 |[[Agr5_Attr1,Agr5_Attr2]]                         |
+-----------+-----------+--------------------------------------------------+

And the second one with this schema:

|-- Agreement_A1: string (nullable = true)
|    |    |-- Lines: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- Line_A1: string (nullable = true)
|    |    |    |    |-- Line_A2: string (nullable = true)

How can I join this two dataframes with the Agreement_A1 column, so the schema of this new dataframe would look like this:

|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Agreement_A1: string (nullable = true)
|    |    |-- Agreement_A2: string (nullable = true)
|    |    |-- Lines: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- Line_A1: string (nullable = true)
|    |    |    |    |-- Line_A2: string (nullable = true)

Upvotes: 2

Views: 1850

Answers (1)

C.S.Reddy Gadipally
C.S.Reddy Gadipally

Reputation: 1758

Hope this helps. You need to unnest (explode) "Details" and join on "Agreement_A1" with your second dataframe. Then, structure your columns as desired.

scala> df1.show(false)
+-----------+-----------+----------------------------------------------------+
|Activity_A1|Activity_A2|Details                                             |
+-----------+-----------+----------------------------------------------------+
|Act1_Attr1 |Act1_Attr2 |[[Agr2_Attr1, Agr2_Attr2], [Agr1_Attr1, Agr1_Attr2]]|
|Act2_Attr1 |Act2_Attr2 |[[Agr4_Attr1, Agr4_Attr2], [Agr3_Attr1, Agr3_Attr2]]|
|Act3_Attr1 |Act3_Attr2 |[[Agr5_Attr1, Agr5_Attr2]]                          |
+-----------+-----------+----------------------------------------------------+


scala> df1.printSchema
root
|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Agreement_A1: string (nullable = true)
|    |    |-- Agreement_A2: string (nullable = true)


scala> df2.show(false)
+------------+--------------------------+
|Agreement_A1|Lines                     |
+------------+--------------------------+
|Agr1_Attr1  |[[A1At1Line1, A1At1Line2]]|
|Agr3_Attr1  |[[A3At1Line1, A3At1Line2]]|
|Agr4_Attr1  |[[A4At1Line1, A4At1Line2]]|
|Agr5_Attr1  |[[A5At1Line1, A5At1Line2]]|
|Agr6_Attr1  |[[A6At1Line1, A6At1Line2]]|
+------------+--------------------------+


scala> df2.printSchema
root
|-- Agreement_A1: string (nullable = true)
|-- Lines: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Line_A1: string (nullable = true)
|    |    |-- Line_A2: string (nullable = true)


scala> val outputDF = df1.withColumn("DetailsExploded", explode($"Details")).join(
    |   df2, $"DetailsExploded.Agreement_A1" === $"Agreement_A1").withColumn(
    |     "DetailsWithAgreementA1Lines", struct($"DetailsExploded.Agreement_A1" as "Agreement_A1", $"DetailsExploded.Agreement_A2" as "Agreement_A2", $"Lines"))
outputDF: org.apache.spark.sql.DataFrame = [Activity_A1: string, Activity_A2: string ... 5 more fields]

scala> outputDF.show(false)
+-----------+-----------+----------------------------------------------------+------------------------+------------+--------------------------+----------------------------------------------------+
|Activity_A1|Activity_A2|Details                                             |DetailsExploded         |Agreement_A1|Lines                     |DetailsWithAgreementA1Lines                         |
+-----------+-----------+----------------------------------------------------+------------------------+------------+--------------------------+----------------------------------------------------+
|Act1_Attr1 |Act1_Attr2 |[[Agr2_Attr1, Agr2_Attr2], [Agr1_Attr1, Agr1_Attr2]]|[Agr1_Attr1, Agr1_Attr2]|Agr1_Attr1  |[[A1At1Line1, A1At1Line2]]|[Agr1_Attr1, Agr1_Attr2, [[A1At1Line1, A1At1Line2]]]|
|Act2_Attr1 |Act2_Attr2 |[[Agr4_Attr1, Agr4_Attr2], [Agr3_Attr1, Agr3_Attr2]]|[Agr3_Attr1, Agr3_Attr2]|Agr3_Attr1  |[[A3At1Line1, A3At1Line2]]|[Agr3_Attr1, Agr3_Attr2, [[A3At1Line1, A3At1Line2]]]|
|Act2_Attr1 |Act2_Attr2 |[[Agr4_Attr1, Agr4_Attr2], [Agr3_Attr1, Agr3_Attr2]]|[Agr4_Attr1, Agr4_Attr2]|Agr4_Attr1  |[[A4At1Line1, A4At1Line2]]|[Agr4_Attr1, Agr4_Attr2, [[A4At1Line1, A4At1Line2]]]|
|Act3_Attr1 |Act3_Attr2 |[[Agr5_Attr1, Agr5_Attr2]]                          |[Agr5_Attr1, Agr5_Attr2]|Agr5_Attr1  |[[A5At1Line1, A5At1Line2]]|[Agr5_Attr1, Agr5_Attr2, [[A5At1Line1, A5At1Line2]]]|
+-----------+-----------+----------------------------------------------------+------------------------+------------+--------------------------+----------------------------------------------------+


scala> outputDF.printSchema
root
|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Agreement_A1: string (nullable = true)
|    |    |-- Agreement_A2: string (nullable = true)
|-- DetailsExploded: struct (nullable = true)
|    |-- Agreement_A1: string (nullable = true)
|    |-- Agreement_A2: string (nullable = true)
|-- Agreement_A1: string (nullable = true)
|-- Lines: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Line_A1: string (nullable = true)
|    |    |-- Line_A2: string (nullable = true)
|-- DetailsWithAgreementA1Lines: struct (nullable = false)
|    |-- Agreement_A1: string (nullable = true)
|    |-- Agreement_A2: string (nullable = true)
|    |-- Lines: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- Line_A1: string (nullable = true)
|    |    |    |-- Line_A2: string (nullable = true)



scala> outputDF.groupBy("Activity_A1", "Activity_A2").agg(collect_list($"DetailsWithAgreementA1Lines") as "Details").show(false)
+-----------+-----------+------------------------------------------------------------------------------------------------------------+
|Activity_A1|Activity_A2|Details                                                                                                     |
+-----------+-----------+------------------------------------------------------------------------------------------------------------+
|Act1_Attr1 |Act1_Attr2 |[[Agr1_Attr1, Agr1_Attr2, [[A1At1Line1, A1At1Line2]]]]                                                      |
|Act2_Attr1 |Act2_Attr2 |[[Agr3_Attr1, Agr3_Attr2, [[A3At1Line1, A3At1Line2]]], [Agr4_Attr1, Agr4_Attr2, [[A4At1Line1, A4At1Line2]]]]|
|Act3_Attr1 |Act3_Attr2 |[[Agr5_Attr1, Agr5_Attr2, [[A5At1Line1, A5At1Line2]]]]                                                      |
+-----------+-----------+------------------------------------------------------------------------------------------------------------+


scala> outputDF.groupBy("Activity_A1", "Activity_A2").agg(collect_list($"DetailsWithAgreementA1Lines") as "Details").printSchema
root
|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Agreement_A1: string (nullable = true)
|    |    |-- Agreement_A2: string (nullable = true)
|    |    |-- Lines: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- Line_A1: string (nullable = true)
|    |    |    |    |-- Line_A2: string (nullable = true)

Upvotes: 4

Related Questions