Reputation: 1354
I have two table, p_to_v mapping, g_to_v mapping.
scala> val p_to_v = Seq(("p1", "v1"), ("p1", "v2"), ("p2", "v1")).toDF("p", "v")
scala> p_to_v.show
+---+---+
| p| v|
+---+---+
| p1| v1|
| p1| v2|
| p2| v1|
+---+---+
"p1" is mapped to [v1, v2]
"p2" is mapped to [v1]
scala> val g_to_v = Seq(("g1", "v1"), ("g2", "v1"), ("g2", "v2"), ("g3", "v2")).toDF("g", "v")
scala> g_to_v.show
+---+---+
| g| v|
+---+---+
| g1| v1|
| g2| v1|
| g2| v2|
| g3| v2|
+---+---+
"g1" is mapped to [v1]
"g2" is mapped to [v1,v2]
"g3" is mapped to [v2]
I want to get all p and g combination for which corresponding v mapping of p is subset of v mapping of g
p1 [v1, v2] - g2 [v1, v2]
p2 [v1] - g1 [v1]
p2 [v1] - g2 [v1, v2]
How can I get same?
Upvotes: 0
Views: 278
Reputation: 812
This is pretty straightforward. You need to use groupBy & then simple inner join
scala> val p_to_v = Seq(("p1", "v1"), ("p1", "v2"), ("p2", "v1")).toDF("p", "v")
19/10/16 22:11:55 WARN metastore: Failed to connect to the MetaStore Server...
p_to_v: org.apache.spark.sql.DataFrame = [p: string, v: string]
scala> val g_to_v = Seq(("g1", "v1"), ("g2", "v1"), ("g2", "v2"), ("g3", "v2")).toDF("g", "v")
g_to_v: org.apache.spark.sql.DataFrame = [g: string, v: string]
Now do groupBy operation
scala> val pv = p_to_v.groupBy($"p").agg(collect_list("v").as("pv"))
pv: org.apache.spark.sql.DataFrame = pv = [p: string, pv: array<string>]
scala> val gv = g_to_v.groupBy($"g").agg(collect_list("v").as("gv"))
gv: org.apache.spark.sql.DataFrame = [g: string, gv: array<string>]
scala> pv.show
+---+--------+
| p| pv|
+---+--------+
| p2| [v1]|
| p1|[v1, v2]|
+---+--------+
scala> gv.show
+---+--------+
| g| gv|
+---+--------+
| g2|[v2, v1]|
| g3| [v2]|
| g1| [v1]|
+---+--------+
Create an UDF for finding the subset
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
def subLis(ar1: Seq[Row], ar2: Seq[Row]) = ar1.toSet.subsetOf(ar2.toSet)
subLis: (ar1: Seq[org.apache.spark.sql.Row], ar2: Seq[org.apache.spark.sql.Row])Boolean
val subLisUDF = udf(subLis _)
UserDefinedFunction(<function2>,BooleanType,None)
Now you can perform a cross join & apply the UDF
spark.conf.set("spark.sql.crossJoin.enabled", "true")
pv.join(gv).withColumn("newdsa", subLisUDF($"pv", $"gv")).filter($"newdsa").show
+---+--------+---+--------+------+
| p| pv| g| gv|newdsa|
+---+--------+---+--------+------+
| p2| [v1]| g2|[v2, v1]| true|
| p1|[v1, v2]| g2|[v2, v1]| true|
| p2| [v1]| g1| [v1]| true|
+---+--------+---+--------+------+
Or joins with condition
pv.join(gv, pv("pv") === gv("gv") || subLisUDF($"pv", $"gv")).show
+---+--------+---+--------+
| p| pv| g| gv|
+---+--------+---+--------+
| p2| [v1]| g2|[v1, v2]|
| p1|[v1, v2]| g2|[v1, v2]|
| p2| [v1]| g1| [v1]|
+---+--------+---+--------+
Try both and take the best performing one.
Upvotes: 1