PharmDataSci
PharmDataSci

Reputation: 117

PySpark: how to get all combinations of columns

I have a DF with combinations of batches, inputs and outputs and I would like to be able to add their "unique combinations" back to the DataFrame. A simple representation of the data looks like this:

Batch Output Input
1 A X
1 A Y
1 A Z
2 A X
2 A Y
2 A Z
3 A V
3 A Y
3 A Z
4 A W
4 A Y
4 A Z

So as you can see there are 4 batches and 3 different combinations of input to make the same output type, what I would like to end up with is:

Batch Output Input Combination
1 A X 1
1 A Y 1
1 A Z 1
2 A X 1
2 A Y 1
2 A Z 1
3 A V 2
3 A Y 2
3 A Z 2
4 A W 3
4 A Y 3
4 A Z 3

I am looking to implement this in PySpark for further data manipulation, any guidance would be appreciated :)

EDIT: still inelegant but it works in PySpark! I am sure there must be a way easier method to do this using either sets or dictionaries, my brain just refuses to let me see it...

df = spark.createDataFrame(
    [
        (1,'A','X'),
        (1,'A','Y'),
        (1,'A','Z'),
        (2,'A','X'),
        (2,'A','Y'),
        (2,'A','Z'),
        (3,'A','V'),
        (3,'A','Y'),
        (3,'A','Z'),
        (4,'A','W'),
        (4,'A','Y'),
        (4,'A','Z'),
        (5,'B','X'),
        (5,'B','Y'),
        (5,'B','Z')
    ],
    ["Batch", "Output", "Input"]
)

grouped = df.orderBy("Input").groupBy(["Batch", "Output"]).agg(f.concat_ws('_', f.sort_array(f.collect_list("Input"))).alias("Comb"))
grouped = grouped.withColumn("TotalComb", f.concat_ws('_',grouped.Output, grouped.Comb))
w = Window.partitionBy().orderBy(f.col('TotalComb').asc())
groupunique = grouped[["totalComb"]].distinct().withColumn("UniqueComb", f.row_number().over(w))
connected = df.join(grouped, on = ["Batch", "Output"], how = "left").join(groupunique, on = ["totalComb"], how = "left")

Upvotes: 2

Views: 697

Answers (1)

wwnde
wwnde

Reputation: 26676

Create a list of inputs, classify by that list, find consecutive differences and use them create values to cummulatively sum over the entire df

w=Window.partitionBy("Batch","Output").orderBy("Batch")

df1=(df.withColumn('Combination',collect_set("Input").over(w))
     .withColumn('Combination',sum(when(lag('Output').over(Window.partitionBy("Combination",'Output').orderBy("Batch")).isNull(),1)
     .otherwise(0)).over(Window.partitionBy().orderBy('Batch')))).show()

+-----+------+-----+-----------+
|Batch|Output|Input|Combination|
+-----+------+-----+-----------+
|    1|     A|    X|          1|
|    1|     A|    Y|          1|
|    1|     A|    Z|          1|
|    2|     A|    X|          1|
|    2|     A|    Y|          1|
|    2|     A|    Z|          1|
|    3|     A|    V|          2|
|    3|     A|    Y|          2|
|    3|     A|    Z|          2|
|    4|     A|    W|          3|
|    4|     A|    Y|          3|
|    4|     A|    Z|          3|
|    5|     B|    X|          4|
|    5|     B|    Y|          4|
|    5|     B|    Z|          4|
+-----+------+-----+-----------+

Upvotes: 1

Related Questions