Reputation: 3782
The following code is used to extract ranks from the column products
. The ranks are second numbers in each pair [...]
. For example, in the given example [[222,66],[333,55]]
the ranks are 66
and 55
for products with PK 222
and 333
, accordingly.
But the code in Spark 2.2 works very slowly when df_products
is around 800 Mb:
df_products.createOrReplaceTempView("df_products")
val result = df.as("df2")
.join(spark.sql("SELECT * FROM df_products")
.select($"product_PK", explode($"products").as("products"))
.withColumnRenamed("product_PK","product_PK_temp").as("df1"),$"df2.product _PK" === $"df1.product_PK_temp" and $"df2.rec_product_PK" === $"df1.products.product_PK", "left")
.drop($"df1.product_PK_temp")
.select($"product_PK", $"rec_product_PK", coalesce($"df1.products.col2", lit(0.0)).as("rank_product"))
This is a small sample of df_products
and df
:
df_products =
+----------+--------------------+
|product_PK| products|
+----------+--------------------+
| 111|[[222,66],[333,55...|
| 222|[[333,24],[444,77...|
...
+----------+--------------------+
df =
+----------+-----------------+
|product_PK| rec_product_PK|
+----------+-----------------+
| 111| 222|
| 222| 888|
+----------+-----------------+
The above-given code works well when the arrays in each row of products
contain a small number of elements. But when there are a lot of elements in the arrays of each row [[..],[..],...]
, then the code seems to get stuck and it does not advance.
How can I optimize the code? Any help is really highly appreciated.
Is it possible, for example, to transform df_products
into the following DataFrame before joining?
df_products =
+----------+--------------------+------+
|product_PK| rec_product_PK| rank|
+----------+--------------------+------+
| 111| 222| 66|
| 111| 333| 55|
| 222| 333| 24|
| 222| 444| 77|
...
+----------+--------------------+------+
Upvotes: 0
Views: 131
Reputation: 13001
As per my answer here, you can transform df_products using something like this:
import org.apache.spark.sql.functions.explode
df1 = df.withColumn("array_elem", explode(df("products"))
df2 = df1.select("product_PK", "array_elem.*")
This assumes products is an array of structs. If products is an array of array, you can use the following instead:
df2 = df1.withColumn("rank", df2("products").getItem(1))
Upvotes: 1