Shishir
Shishir

Reputation: 23

How to join two dataframes where column matches with two columns in the second dataframe?

There are 2 DataFrames. I want to only select those devices from first DF which fulfills these conditions:

  1. If PatternDS has patterns on both Pat1 and Pat2 and any devices match those
  2. If on PatternDS either Pat1 or Pat2 is NA, then any device match other side's pattern

I can do this with some UDF and some loops but I want to do this with some joins. Any hints appreciated.

DevicesDS:

| DeviceId | Pattern    |
| -------- | ---------- |
| D1       | Dr_123_5.0 |
| D2       | Dr_456_6.0 |
| D3       | Ap_111_3.5 |
| D1       | Ap_333_4.5 |
| D2       | OE_222_7.7 |
| D4       | Dr_123_5.0 |

PatternDS:

|     Pat1      |     Pat2       |
| --------------| -------------- |
|Dr_123_5.0     | Ap_333_4.5     |
|NA             | OE_222_7.7     |
|Ap_111_3.5     | NA             |
val result = DevicesDS.groupBy("deviceId","Pattern").count().groupBy("deviceId").agg(collect_set(struct("Pattern")).as("Pat"))

I get two columns from the DeviceDS where first column is DeviceId and second Column is collect_set of list of Patterns.

Now I need to apply a join.

Expected Output:

  1. Since D1 has both Pat1 (Dr_123_5.0) & Pat2 (Ap_333_4.5) match, this should be included
  2. D2 has Pat2 (OE_222_7.7) and Pat1 for that row is NA, this should be included
  3. D3 has Pat1 (Ap_111_3.5) and Pat2 for that row is NA, this should be included
  4. D4 has Pat1 (Dr_123_5.0) from row#1 but does not have the Pat2 in it, so this is not eligible.

So final Result is:

| DeviceId | Patterns          |
| -------- | ----------        |
| D1       | array of Patterns |
| D2       | array of Patterns |
| D3       | array of Patterns |

Note D4 is not in this list because that did not meet the criteria. Patterns includes array of matching patterns.

Upvotes: 2

Views: 199

Answers (1)

blackbishop
blackbishop

Reputation: 32660

Assuming these are your input dataframes:

val DevicesDS = Seq(
  ("D1", "Dr_123_5.0"), ("D2", "Dr_456_6.0"), ("D2", "OE_222_7.7"),
  ("D3", "Ap_111_3.5"), ("D1", "Ap_333_4.5"), ("D4", "Dr_123_5.0")
).toDF("DeviceId", "Pattern")

val PatternDS = Seq(
  ("Dr_123_5.0", "Ap_333_4.5"), ("NA", "OE_222_7.7"),("Ap_111_3.5", "NA")
).toDF("Pat1", "Pat2")

First, group by the dataframe DeviceDS to get list of patterns associated with each DeviceId:

val DevicesDSGrouped = DevicesDS.groupBy("deviceId").agg(collect_set($"Pattern").as("Patterns"))

DevicesDSGrouped.show(false)
//+--------+------------------------+
//|deviceId|Patterns                |
//+--------+------------------------+
//|D1      |[Dr_123_5.0, Ap_333_4.5]|
//|D3      |[Ap_111_3.5]            |
//|D2      |[OE_222_7.7, Dr_456_6.0]|
//|D4      |[Dr_123_5.0]            |
//+--------+------------------------+

Then, join with PatternDS dataframe using array_except function in condition to check if both patterns match or one match and another is NA. And finally, group by DeviceId and collect columns Pat1 and Pat2 like this:

val joinCondition = size(array_except(array_remove(array($"Pat1", $"Pat2"), "NA"), $"Patterns")) === 0

val result = DevicesDSGrouped.join(PatternDS, joinCondition)
  .groupBy("deviceId")
  .agg(
    array_remove(flatten(collect_list(array($"Pat1", $"Pat2"))), "NA").as("Patterns")
  )

result.show(false)
//+--------+------------------------+
//|deviceId|Patterns                |
//+--------+------------------------+
//|D1      |[Dr_123_5.0, Ap_333_4.5]|
//|D3      |[Ap_111_3.5]            |
//|D2      |[OE_222_7.7]            |
//+--------+------------------------+

Upvotes: 2

Related Questions