artyomboyko
artyomboyko

Reputation: 2871

Spark complex grouping

I have this data structure in Spark:

val df = Seq(
("Package 1", Seq("address1", "address2", "address3")),
("Package 2", Seq("address3", "address4", "address5", "address6")),
("Package 3", Seq("address7", "address8")),
("Package 4", Seq("address9")),
("Package 5", Seq("address9", "address1")),
("Package 6", Seq("address10")),
("Package 7", Seq("address8"))).toDF("Package", "Destinations")
df.show(20, false)

I need to find all the addresses that were seen together across different packages. Looks like I can't find a way to efficiently do that. I've tried to group, map, etc. Ideally, result of the given df would be

+----+------------------------------------------------------------------------+
| Id |                               Addresses                                |
+----+------------------------------------------------------------------------+
|  1 | [address1, address2, address3, address4, address5, address6, address9] |
|  2 | [address7, address8]                                                   |
|  3 | [address10]                                                            |
+----+------------------------------------------------------------------------+

Upvotes: 0

Views: 121

Answers (1)

WestCoastProjects
WestCoastProjects

Reputation: 63231

Look into using TreeReduce https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/rdd/RDD.html#treeReduce(scala.Function2,%20int)

  • For the sequential operation you create a Set of Sets:

    • For each new Array of elements e.g. [ address 7, address 8] - iterate through existing sets to check if the intersection were non empty: if so then add those elements to that Set

      • otherwise create a new Set containing those elements
    • For the combine operation:

      • For each of the Sets on the left side of the Combine operation: -- Iterate through all Sets in the right side to find any with non-empty intersection -- If any non empty inteserction found then combine the two Sets.

Note TreeReduce is the newer naming. TreeAggregate is used in older versions of Spark

Upvotes: 2

Related Questions