Rashmi Jadhao
Rashmi Jadhao

Reputation: 1

Need Pyspark solution to generate runorder by checking dependency amongst column

I have a dataframe with column

varid questionkey
id1 qid1
id2 id1
id3 qid3
id4 id2

for this data I want to create a runorder column. questionkey column has id from id column so that id should execute first before executing this depending id3.

expected output

varid questionkey runorder
id1 qid1 1
id2 id1 2
id3 qid3 1
id4 id2 3

what can we do to achieve this using pyspark?

I tried it using case when condition but the recursive operation I am not able to perform.

First assigned runorder as 1 to all varids

df_with_runorder = df.withColumn("runorder", lit(1)).withColumnRenamed("VarID","VarID_a") 
#renamed the columns to before joining
df_with_runorder=df_with_runorder.withColumnRenamed("QuestionKey","QuestionKey_a")\ .withColumnRenamed("runorder","runorder_a") 

#here performed joined to check dependency of values in varid and questionkey and if match found(dependency found) then updated runorder by adding 1 to it

joined_df = df_with_runorder.alias("a").join(df.alias("b"), col("a.varid_a") == col("b.QuestionKey"), "left")\ .withColumn("updated_runorder",when(col("VarID").isNotNull(),col("runorder_a")+1).otherwise(col("runorder_a"))) joined_df.show() 

this will update only once it will not check for each dependency, it will only check immediate dependency and update runorder but for id4 there are 2 dependencies with id2 and then id1 it will only check that its depends on d2 and add 1 but ideally it should check id2 also depend on id1 fist update id2 to 2 and then id4 to 3.

Upvotes: 0

Views: 69

Answers (1)

Islam Elbanna
Islam Elbanna

Reputation: 1757

I think you need an iterative algorithm so you can create this dependency count

from pyspark.sql.functions import col, lit

data = [("id1", "qid1"),
        ("id2", "id1"),
        ("id3", "qid3"),
        ("id4", "id2")]


df = spark.createDataFrame(data, ["id", "questionkey"])
df = df.withColumn('runOrder', lit(1)) # Set default order as 1 for all

index = 1
while True:
    qdf = df.filter(df.runOrder == index).select(df.id).alias("qdf")
    if qdf.count() == 0:
        # If no records with new changes from the last iteration then break
        break;
    index += 1
    df = df.join(qdf, df.questionkey == qdf.id, "left_outer")
    df = df.withColumn('runOrder', when(col('qdf.id').isNotNull(), df.runOrder + 1).otherwise(df.runOrder)).drop(col('qdf.id'))
    df.cache()

df.orderBy('id').show()
+---+-----------+--------+
| id|questionkey|runOrder|
+---+-----------+--------+
|id1|       qid1|       1|
|id2|        id1|       2|
|id3|       qid3|       1|
|id4|        id2|       3|
+---+-----------+--------+

Upvotes: 0

Related Questions