Reputation: 1106
I am trying to match multiple columns from one data frame (df) to to a multiple language dictionary (df_label) and extract the the corresponding labels for each column.
Note: This is not a duplcate question of Join multiple columns from one table to single column from another table
The following is an example of df and df_label dataframes and the desired output
df df_label output
+---+---+ +---+-----+----+ +---+---+------+------+------+
| s| o| | e| name|lang| | s| o|s_name|o_name| lang|
+---+---+ +---+-----+----+ +---+---+------+------+------+
| s1| o1| | s1|s1_en| en| | s2| o1| s2_fr| o1_fr| fr|
| s1| o3| | s1|s1_fr| fr| | s1| o1| s1_fr| o1_fr| fr|
| s2| o1| | s2|s2_fr| fr| | s1| o1| s1_en| o1_en| en|
| s2| o2| | o1|o1_fr| fr| | s2| o2| s2_fr| o2_fr| fr|
+---+---+ | o1|o1_en| en| +---+---+------+------+------+
| o2|o2_fr| fr|
+---+-----+----+
In other words I want match both columns [s,o] from df with column e from df_label and find their corresponding labels in different languages as shown above.
The multi-lang dictionary( df_label) is huge and columns [s,o] have many duplicates, so two join operations is highly inefficient.
Is there any way that could be achieved without multiple joins?
FYI, this is what I did using multiple joins but I really don't like it.
df = spark.createDataFrame([('s1','o1'),('s1','o3'),('s2','o1'),('s2','o2')]).toDF('s','o')
df_label = spark.createDataFrame([('s1','s1_en','en'),('s1','s1_fr','fr'),('s2','s2_fr','fr'),('o1','o1_fr','fr'),('o1','o1_en','en'),('o2','o2_fr','fr')]).toDF('e','name','lang')
df = df.join(df_label,col('s')==col('e')).drop('e').withColumnRenamed('name','s_name').withColumnRenamed('lang','s_lang')
df = df.join(df_label,col('o')==col('e')).drop('e').withColumnRenamed('name','o_name').select('s','o','s_name','o_name','s_lang','o','o_name','lang').withColumnRenamed('lang','o_lang').filter(col('o_lang')==col('s_lang')).drop('s_lang')
Upvotes: 2
Views: 138
Reputation: 1106
Building on what gaw suggested, this is my proposed solution
The approach was to use only one join but then use a conditional aggregate collect_list to check whether the match was for s column or o column.
df = = spark.createDataFrame([('s1','o1'),('s1','o3'),('s2','o1'),('s2','o2')]).toDF('s','o')
df_label = spark.createDataFrame([('s1','s1_en','en'),('s1','s1_fr','fr'),('s2','s2_fr','fr'),('o1','o1_fr','fr'),('o1','o1_en','en'),('o2','o2_fr','fr')]).toDF('e','name','lang')
df.join(df_label,(col('e')== col('s')) | (col('e') == col('o'))) \
.groupBy(['s','o','lang']) \
.agg(collect_list(when(col('e')==col('s'),col('name'))).alias('s_name')\
,collect_list(when(col('e')==col('o'),col('name'))).alias('o_name')) \
.withColumn('s_name',explode('s_name')).withColumn('o_name',explode('o_name')).show()
+---+---+----+------+------+
| s| o|lang|s_name|o_name|
+---+---+----+------+------+
| s2| o2| fr| s2_fr| o2_fr|
| s1| o1| en| s1_en| o1_en|
| s1| o1| fr| s1_fr| o1_fr|
| s2| o1| fr| s2_fr| o1_fr|
+---+---+----+------+------+
Upvotes: 2
Reputation: 1960
I created a way which works with only one join, but since it uses additional (expensive) operations like explode
etc. I am not sure if it is faster.
But if you like you could give it a try.
The following code produces the desired output:
df = spark.createDataFrame([('s1','o1'),('s1','o3'),('s2','o1'),('s2','o2')]).toDF('s','o')
df_label = spark.createDataFrame([('s1','s1_en','en'),('s1','s1_fr','fr'),('s2','s2_fr','fr'),('o1','o1_fr','fr'),('o1','o1_en','en'),('o2','o2_fr','fr')]).toDF('e','name','lang')
df = df.join(df_label,[(col('s')==col('e')) | \
(col('o')==col('e'))]).drop('e').\ #combine the two join conditions
withColumn("o_name",when(col("name").startswith("o"),col("name")).otherwise(None)).\
withColumn("s_name",when(col("name").startswith("s"),col("name")).otherwise(None)).\ #create the o_name and s_name cols
groupBy("s","o").agg(collect_list("o_name").alias("o_name"),collect_list("s_name").alias("s_name")).\
#perform a group to aggregate the required vales
select("s","o",explode("o_name").alias("o_name"),"s_name").\ # explode the lists from the group to attach it to the correct pairs of o and s
select("s","o",explode("s_name").alias("s_name"),"o_name").\
withColumn("o_lang", col("o_name").substr(-2,2)).\
withColumn("lang", col("s_name").substr(-2,2)).filter(col("o_lang")==col("lang")).drop("o_lang")
#manually create the o_lang and lang columns
Result:
+---+---+------+------+----+
|s |o |s_name|o_name|lang|
+---+---+------+------+----+
|s2 |o2 |s2_fr |o2_fr |fr |
|s2 |o1 |s2_fr |o1_fr |fr |
|s1 |o1 |s1_fr |o1_fr |fr |
|s1 |o1 |s1_en |o1_en |en |
+---+---+------+------+----+
Upvotes: 1