Reputation: 117
I have a DF with combinations of batches, inputs and outputs and I would like to be able to add their "unique combinations" back to the DataFrame. A simple representation of the data looks like this:
Batch | Output | Input |
---|---|---|
1 | A | X |
1 | A | Y |
1 | A | Z |
2 | A | X |
2 | A | Y |
2 | A | Z |
3 | A | V |
3 | A | Y |
3 | A | Z |
4 | A | W |
4 | A | Y |
4 | A | Z |
So as you can see there are 4 batches and 3 different combinations of input to make the same output type, what I would like to end up with is:
Batch | Output | Input | Combination |
---|---|---|---|
1 | A | X | 1 |
1 | A | Y | 1 |
1 | A | Z | 1 |
2 | A | X | 1 |
2 | A | Y | 1 |
2 | A | Z | 1 |
3 | A | V | 2 |
3 | A | Y | 2 |
3 | A | Z | 2 |
4 | A | W | 3 |
4 | A | Y | 3 |
4 | A | Z | 3 |
I am looking to implement this in PySpark for further data manipulation, any guidance would be appreciated :)
EDIT: still inelegant but it works in PySpark! I am sure there must be a way easier method to do this using either sets or dictionaries, my brain just refuses to let me see it...
df = spark.createDataFrame(
[
(1,'A','X'),
(1,'A','Y'),
(1,'A','Z'),
(2,'A','X'),
(2,'A','Y'),
(2,'A','Z'),
(3,'A','V'),
(3,'A','Y'),
(3,'A','Z'),
(4,'A','W'),
(4,'A','Y'),
(4,'A','Z'),
(5,'B','X'),
(5,'B','Y'),
(5,'B','Z')
],
["Batch", "Output", "Input"]
)
grouped = df.orderBy("Input").groupBy(["Batch", "Output"]).agg(f.concat_ws('_', f.sort_array(f.collect_list("Input"))).alias("Comb"))
grouped = grouped.withColumn("TotalComb", f.concat_ws('_',grouped.Output, grouped.Comb))
w = Window.partitionBy().orderBy(f.col('TotalComb').asc())
groupunique = grouped[["totalComb"]].distinct().withColumn("UniqueComb", f.row_number().over(w))
connected = df.join(grouped, on = ["Batch", "Output"], how = "left").join(groupunique, on = ["totalComb"], how = "left")
Upvotes: 2
Views: 697
Reputation: 26676
Create a list of inputs, classify by that list, find consecutive differences and use them create values to cummulatively sum over the entire df
w=Window.partitionBy("Batch","Output").orderBy("Batch")
df1=(df.withColumn('Combination',collect_set("Input").over(w))
.withColumn('Combination',sum(when(lag('Output').over(Window.partitionBy("Combination",'Output').orderBy("Batch")).isNull(),1)
.otherwise(0)).over(Window.partitionBy().orderBy('Batch')))).show()
+-----+------+-----+-----------+
|Batch|Output|Input|Combination|
+-----+------+-----+-----------+
| 1| A| X| 1|
| 1| A| Y| 1|
| 1| A| Z| 1|
| 2| A| X| 1|
| 2| A| Y| 1|
| 2| A| Z| 1|
| 3| A| V| 2|
| 3| A| Y| 2|
| 3| A| Z| 2|
| 4| A| W| 3|
| 4| A| Y| 3|
| 4| A| Z| 3|
| 5| B| X| 4|
| 5| B| Y| 4|
| 5| B| Z| 4|
+-----+------+-----+-----------+
Upvotes: 1