Reputation: 1
I have a dataframe with column
varid | questionkey |
---|---|
id1 | qid1 |
id2 | id1 |
id3 | qid3 |
id4 | id2 |
for this data I want to create a runorder
column. questionkey
column has id from id
column so that id should execute first before executing this depending id3.
expected output
varid | questionkey | runorder |
---|---|---|
id1 | qid1 | 1 |
id2 | id1 | 2 |
id3 | qid3 | 1 |
id4 | id2 | 3 |
what can we do to achieve this using pyspark?
I tried it using case when condition but the recursive operation I am not able to perform.
df_with_runorder = df.withColumn("runorder", lit(1)).withColumnRenamed("VarID","VarID_a")
#renamed the columns to before joining
df_with_runorder=df_with_runorder.withColumnRenamed("QuestionKey","QuestionKey_a")\ .withColumnRenamed("runorder","runorder_a")
#here performed joined to check dependency of values in varid and questionkey and if match found(dependency found) then updated runorder by adding 1 to it
joined_df = df_with_runorder.alias("a").join(df.alias("b"), col("a.varid_a") == col("b.QuestionKey"), "left")\ .withColumn("updated_runorder",when(col("VarID").isNotNull(),col("runorder_a")+1).otherwise(col("runorder_a"))) joined_df.show()
this will update only once it will not check for each dependency, it will only check immediate dependency and update runorder but for id4 there are 2 dependencies with id2 and then id1 it will only check that its depends on d2 and add 1 but ideally it should check id2 also depend on id1 fist update id2 to 2 and then id4 to 3.
Upvotes: 0
Views: 69
Reputation: 1757
I think you need an iterative algorithm so you can create this dependency count
from pyspark.sql.functions import col, lit
data = [("id1", "qid1"),
("id2", "id1"),
("id3", "qid3"),
("id4", "id2")]
df = spark.createDataFrame(data, ["id", "questionkey"])
df = df.withColumn('runOrder', lit(1)) # Set default order as 1 for all
index = 1
while True:
qdf = df.filter(df.runOrder == index).select(df.id).alias("qdf")
if qdf.count() == 0:
# If no records with new changes from the last iteration then break
break;
index += 1
df = df.join(qdf, df.questionkey == qdf.id, "left_outer")
df = df.withColumn('runOrder', when(col('qdf.id').isNotNull(), df.runOrder + 1).otherwise(df.runOrder)).drop(col('qdf.id'))
df.cache()
df.orderBy('id').show()
+---+-----------+--------+
| id|questionkey|runOrder|
+---+-----------+--------+
|id1| qid1| 1|
|id2| id1| 2|
|id3| qid3| 1|
|id4| id2| 3|
+---+-----------+--------+
Upvotes: 0