L. Chu
L. Chu

Reputation: 133

Handling Join ON OR in Spark

I have a data frame like so:

+---+---+---+---+---+
|AId| A1| A2| A3| A4|
+---+---+---+---+---+
|  1|  *|  a|  b|  c|
|  2|  *|  *|  b|  c|
|  3|  c|  a|  b|  c|
|  4|  *|  *|  *|  c|
|  5|  *|  *|  *|  *|
+---+---+---+---+---+

that I would like to join on:

+---+---+---+---+---+----+
|BId| B1| B2| B3| B4|Code|
+---+---+---+---+---+----+
|  1|  c|  a|  b|  c|  AO|
|  2|  b|  a|  b|  c|  AS|
|  3|  b|  b|  b|  c|  AT|
|  4|  a|  d|  d|  c|  BO|
|  5|  d|  a|  c|  b|  BS|
|  6|  a|  b|  b|  c|  BT|
|  7|  d|  d|  d|  c|  CO|
|  8|  d|  d|  d|  d|  CS|
+---+---+---+---+---+----+

to match an ID with Rule. However, * is a wildcard. It will match anything. In the above example AId == 1 will match BId 1 and 2, AId == 3 will only match BId 1, AId == 4 will match all except 5 and 8, and AId == 5 will match all 8.

What would be the best way to approach this? The query seems expensive in Spark, and moreover Spark doesn't have OR built in. The alternative seems to do a case-when for A1-A4 to set a flag, and then go back and join. A tricky point also is that the wildcards can appear 1-4 times in the first table, in any column, though they do not appear in the second.

Upvotes: 0

Views: 205

Answers (1)

zero323
zero323

Reputation: 330203

You can express join condition as:

(A1 = * | (A1 = B1)) AND (A2 = * | (A2 = B2)) AND ... AND (AN = * | (AN = BN))

With PySpark equivalent expression can generated for example like this

from pyspark.sql.functions import col
from functools import reduce
from operator import and_

expr = reduce(
    and_, 
    ((col("A{}".format(i)) == "*") | (col("A{}".format(i)) == col("B{}".format(i)))
    for i in range(1, 5)))
Column<b'(((((A1 = *) OR (A1 = B1)) AND ((A2 = *) OR (A2 = B2))) AND ((A3 = *) OR (A3 = B3))) AND ((A4 = *) OR (A4 = B4)))'>

and used with crossJoin:

a.crossJoin(b).where(expr)

or

spark.conf.set("spark.sql.crossJoin.enabled", "true")

a.join(b, expr)

Unfortunately this quite expensive, due to Cartesian product. With small number of columns (4 is probably a border case) you could try to generate power set of columns and create optimized plans, but it obviously it won't scale to a larger number of columns.

Upvotes: 1

Related Questions