Reputation: 23
There are 2 DataFrames. I want to only select those devices
from first DF which fulfills these conditions:
I can do this with some UDF and some loops but I want to do this with some joins. Any hints appreciated.
DevicesDS:
| DeviceId | Pattern |
| -------- | ---------- |
| D1 | Dr_123_5.0 |
| D2 | Dr_456_6.0 |
| D3 | Ap_111_3.5 |
| D1 | Ap_333_4.5 |
| D2 | OE_222_7.7 |
| D4 | Dr_123_5.0 |
PatternDS:
| Pat1 | Pat2 |
| --------------| -------------- |
|Dr_123_5.0 | Ap_333_4.5 |
|NA | OE_222_7.7 |
|Ap_111_3.5 | NA |
val result = DevicesDS.groupBy("deviceId","Pattern").count().groupBy("deviceId").agg(collect_set(struct("Pattern")).as("Pat"))
I get two columns from the DeviceDS
where first column is DeviceId
and second Column is collect_set
of list of Patterns
.
Now I need to apply a join.
Expected Output:
D1
has both Pat1
(Dr_123_5.0) & Pat2
(Ap_333_4.5) match, this should be includedD2
has Pat2
(OE_222_7.7) and Pat1
for that row is NA
, this should be includedD3
has Pat1
(Ap_111_3.5) and Pat2
for that row is NA
, this should be includedD4
has Pat1
(Dr_123_5.0) from row#1 but does not have the Pat2
in it, so this is not eligible.So final Result is:
| DeviceId | Patterns |
| -------- | ---------- |
| D1 | array of Patterns |
| D2 | array of Patterns |
| D3 | array of Patterns |
Note D4
is not in this list because that did not meet the criteria. Patterns
includes array of matching patterns.
Upvotes: 2
Views: 199
Reputation: 32660
Assuming these are your input dataframes:
val DevicesDS = Seq(
("D1", "Dr_123_5.0"), ("D2", "Dr_456_6.0"), ("D2", "OE_222_7.7"),
("D3", "Ap_111_3.5"), ("D1", "Ap_333_4.5"), ("D4", "Dr_123_5.0")
).toDF("DeviceId", "Pattern")
val PatternDS = Seq(
("Dr_123_5.0", "Ap_333_4.5"), ("NA", "OE_222_7.7"),("Ap_111_3.5", "NA")
).toDF("Pat1", "Pat2")
First, group by the dataframe DeviceDS
to get list of patterns associated with each DeviceId
:
val DevicesDSGrouped = DevicesDS.groupBy("deviceId").agg(collect_set($"Pattern").as("Patterns"))
DevicesDSGrouped.show(false)
//+--------+------------------------+
//|deviceId|Patterns |
//+--------+------------------------+
//|D1 |[Dr_123_5.0, Ap_333_4.5]|
//|D3 |[Ap_111_3.5] |
//|D2 |[OE_222_7.7, Dr_456_6.0]|
//|D4 |[Dr_123_5.0] |
//+--------+------------------------+
Then, join with PatternDS
dataframe using array_except
function in condition to check if both patterns match or one match and another is NA
. And finally, group by DeviceId
and collect columns Pat1
and Pat2
like this:
val joinCondition = size(array_except(array_remove(array($"Pat1", $"Pat2"), "NA"), $"Patterns")) === 0
val result = DevicesDSGrouped.join(PatternDS, joinCondition)
.groupBy("deviceId")
.agg(
array_remove(flatten(collect_list(array($"Pat1", $"Pat2"))), "NA").as("Patterns")
)
result.show(false)
//+--------+------------------------+
//|deviceId|Patterns |
//+--------+------------------------+
//|D1 |[Dr_123_5.0, Ap_333_4.5]|
//|D3 |[Ap_111_3.5] |
//|D2 |[OE_222_7.7] |
//+--------+------------------------+
Upvotes: 2