Reputation: 330
df_hrrchy
|lefId |Lineage |
|-------|--------------------------------------|
|36326 |["36326","36465","36976","36091","82"]|
|36121 |["36121","36908","36976","36091","82"]|
|36380 |["36380","36465","36976","36091","82"]|
|36448 |["36448","36465","36976","36091","82"]|
|36683 |["36683","36465","36976","36091","82"]|
|36949 |["36949","36908","36976","36091","82"]|
|37349 |["37349","36908","36976","36091","82"]|
|37026 |["37026","36908","36976","36091","82"]|
|36879 |["36879","36465","36976","36091","82"]|
df_trans
|tranID | T_Id |
|-----------|-------------------------------------------------------------------------|
|1000540 |["36121","36326","37349","36949","36380","37026","36448","36683","36879"]|
df_creds
|T_Id |T_val |T_Goal |Parent_T_Id |Parent_Val |parent_Goal|
|-------|-------|-------|---------------|----------------|-----------|
|36448 |100 |1 |36465 |200 |1 |
|36465 |200 |1 |36976 |300 |2 |
|36326 |90 |1 |36465 |200 |1 |
|36091 |500 |19 |82 |600 |4 |
|36121 |90 |1 |36908 |200 |1 |
|36683 |90 |1 |36465 |200 |1 |
|36908 |200 |1 |36976 |300 |2 |
|36949 |90 |1 |36908 |200 |1 |
|36976 |300 |2 |36091 |500 |19 |
|37026 |90 |1 |36908 |200 |1 |
|37349 |100 |1 |36908 |200 |1 |
|36879 |90 |1 |36465 |200 |1 |
|36380 |90 |1 |36465 |200 |1 |
Desired Result
T_id | children | T_Val | T_Goal | parent_T_id | parent_Goal | trans_id |
---|---|---|---|---|---|---|
36091 | ["36976"] | 500 | 19 | 82 | 4 | 1000540 |
36465 | ["36448","36326","36683","36879","36380"] | 200 | 1 | 36976 | 2 | 1000540 |
36908 | ["36121","36949","37026","37349"] | 200 | 1 | 36976 | 2 | 1000540 |
36976 | ["36465","36908"] | 300 | 2 | 36091 | 19 | 1000540 |
36683 | null | 90 | 1 | 36465 | 1 | 1000540 |
37026 | null | 90 | 1 | 36908 | 1 | 1000540 |
36448 | null | 100 | 1 | 36465 | 1 | 1000540 |
36949 | null | 90 | 1 | 36908 | 1 | 1000540 |
36326 | null | 90 | 1 | 36465 | 1 | 1000540 |
36380 | null | 90 | 1 | 36465 | 1 | 1000540 |
36879 | null | 90 | 1 | 36465 | 1 | 1000540 |
36121 | null | 90 | 1 | 36908 | 1 | 1000540 |
37349 | null | 100 | 1 | 36908 | 1 | 1000540 |
Code Tried
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.functions import explode, collect_set, expr, col, collect_list,array_contains, lit
from functools import reduce
for row in df_transactions.rdd.toLocalIterator():
# def find_nodemap(row):
dfs = []
df_hy_set = (df_hrrchy.filter(df_hrrchy. lefId.isin(row["T_ds"]))
.select(explode("Lineage").alias("Terrs"))
.agg(collect_set(col("Terrs")).alias("hierarchy_list"))
.select(F.lit(row["trans_id"]).alias("trans_id "),"hierarchy_list")
)
df_childrens = (df_creds.join(df_ hy _set, expr("array_contains(hierarchy_list, T_id)"))
.select("T_id", "T_Val","T_Goal","parent_T_id", "parent_Goal", "trans _id" )
.groupBy("parent_T_id").agg(collect_list("T_id").alias("children"))
)
df_filter_creds = (df_creds.join(df_ hy _set, expr("array_contains(hierarchy_list, T_id)"))
.select ("T_id", "T_val","T_Goal","parent_T_id", "parent_Goal”, "trans_id")
)
df_nodemap = (df_filter_ creds.alias("A").join(df_childrens.alias("B"), col("A.T_id") == col("B.parent_T_id"), "left")
.select("A.T_id","B.children", "A.T_val","A.terr_Goal","A.parent_T_id", "A.parent_Goal", "A.trans_ id")
)
display(df_nodemap)
# dfs.append(df_nodemap)
# df = reduce(DataFrame.union, dfs)
# display(df)
# # display(df)
My problem - Its a bad design. df_trans is having millions of data and looping through dataframe , its taking forever. Without looping can I do it. I tried couple of other methods, not able to get the desired result.
Upvotes: 4
Views: 531
Reputation: 10703
You certainly need to process entire DataFrame in batch, not iterate row by row.
Key points are to "reverse" df_hrrchy
, ie. from parent lineage obtain list of children for every T_Id
:
val df_children = df_hrrchy.withColumn("children", slice($"Lineage", lit(1), size($"Lineage") - 1))
.withColumn("parents", slice($"Lineage", 2, 999999))
.select(explode(arrays_zip($"children", $"parents")).as("rels"))
.distinct
.groupBy($"rels.parents".as("T_Id"))
.agg(collect_set($"rels.children").as("children"))
df_children.show(false)
+-----+-----------------------------------+
|T_Id |children |
+-----+-----------------------------------+
|36091|[36976] |
|36465|[36448, 36380, 36326, 36879, 36683]|
|36976|[36465, 36908] |
|82 |[36091] |
|36908|[36949, 37349, 36121, 37026] |
+-----+-----------------------------------+
then expand list of T_Ids
in df_trans
and also include all T_Id
s from the hierarchy:
val df_trans_map = df_trans.withColumn("T_Id", explode($"T_Id"))
.join(df_hrrchy, array_contains($"Lineage", $"T_Id"))
.select($"tranID", explode($"Lineage").as("T_Id"))
.distinct
df_trans_map.show(false)
+-------+-----+
|tranID |T_Id |
+-------+-----+
|1000540|36976|
|1000540|82 |
|1000540|36091|
|1000540|36465|
|1000540|36326|
|1000540|36121|
|1000540|36908|
|1000540|36380|
|1000540|36448|
|1000540|36683|
|1000540|36949|
|1000540|37349|
|1000540|37026|
|1000540|36879|
+-------+-----+
With this it is just a simple join to obtain final result:
df_trans_map.join(df_creds, Seq("T_Id"))
.join(df_children, Seq("T_Id"), "left_outer")
.show(false)
+-----+-------+-----+------+-----------+----------+-----------+-----------------------------------+
|T_Id |tranID |T_val|T_Goal|Parent_T_Id|Parent_Val|parent_Goal|children |
+-----+-------+-----+------+-----------+----------+-----------+-----------------------------------+
|36976|1000540|300 |2 |36091 |500 |19 |[36465, 36908] |
|36091|1000540|500 |19 |82 |600 |4 |[36976] |
|36465|1000540|200 |1 |36976 |300 |2 |[36448, 36380, 36326, 36879, 36683]|
|36326|1000540|90 |1 |36465 |200 |1 |null |
|36121|1000540|90 |1 |36908 |200 |1 |null |
|36908|1000540|200 |1 |36976 |300 |2 |[36949, 37349, 36121, 37026] |
|36380|1000540|90 |1 |36465 |200 |1 |null |
|36448|1000540|100 |1 |36465 |200 |1 |null |
|36683|1000540|90 |1 |36465 |200 |1 |null |
|36949|1000540|90 |1 |36908 |200 |1 |null |
|37349|1000540|100 |1 |36908 |200 |1 |null |
|37026|1000540|90 |1 |36908 |200 |1 |null |
|36879|1000540|90 |1 |36465 |200 |1 |null |
+-----+-------+-----+------+-----------+----------+-----------+-----------------------------------+
Upvotes: 1
Reputation: 5155
You need to re-write this to use the full cluster, using a localIterator means that you aren't fully utilizing the cluster for shared work.
Below code was not run as you didn't provide a workable data set to test. If you do I'll run the code to make sure it's sound.
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.functions import explode, collect_set, expr, col, collect_list,array_contains, lit
from functools import reduce
#uses explode I know this will create a lot of short lived records but the flip side is it will use the entire cluster to complete the work instead of the driver.
df_trans_expld = df_trans.select( df_trans.tranID, explode(df_trans.T_Id).alias("T_Id") )
#uses explode
df_hrrchy_expld = df_hrrchy.select( df_hrrchy.leftId, explode( df_hrrchy.Lineage ).alias("Lineage") )
#uses exploded data to join which is the same as a filter.
df_hy_set = df_trans_expld.join( df_hrrchy_expld, df_hrrchy_expld.lefId === df_trans_expld.T_id, "left").select( "trans_id" ).agg(collect_set(col("Lineage")).alias("hierarchy_list"))
.select(F.lit(col("trans_id")).alias("trans_id "),"hierarchy_list")
#logic unchanged from here down
df_childrens = (df_creds.join(df_hy_set, expr("array_contains(hierarchy_list, T_id)"))
.select("T_id", "T_Val","T_Goal","parent_T_id", "parent_Goal", "trans _id" )
.groupBy("parent_T_id").agg(collect_list("T_id").alias("children"))
)
df_filter_creds = (df_creds.join(ddf_hy_set, expr("array_contains(hierarchy_list, T_id)"))
.select ("T_id", "T_val","T_Goal","parent_T_id", "parent_Goal”, "trans_id")
)
df_nodemap = (df_filter_creds.alias("A").join(df_childrens.alias("B"), col("A.T_id") == col("B.parent_T_id"), "left")
.select("A.T_id","B.children", "A.T_val","A.terr_Goal","A.parent_T_id", "A.parent_Goal", "A.trans_ id")
)
# no need to append/union data as it's now just one dataframe df_nodemap
I'd have to look into this more but I'm pretty sure you are pulling all the data through the driver(with your existing code), which will really slow things down, this will make use of all executors to complete the work.
There may be another optimization to get rid of the array_contains
(and use a join instead). I'd have to look at the explain to see if you could get even more performance out of it. Don't remember off the top of my head, you are avoiding a shuffle so it may be better as is.
Upvotes: 1