Reputation: 809
I have a column called event_data in json
format in my spark DataFrame, after reading it using from_json
, I get this schema:
root
|-- user_id: string (nullable = true)
|-- event_data: struct (nullable = true)
| |-- af_content_id: string (nullable = true)
| |-- af_currency: string (nullable = true)
| |-- af_order_id: long (nullable = true)
I only need af_content_id
from this column. This attribute can be of different formats:
['ghhjj23','123546',12356]
af_content_id
)
I want to use explode
function in order to return a new row for each element in af_content_id
when it is of format List. But as when I apply it, I get an error:
from pyspark.sql.functions import explode
def get_content_id(column):
return column.af_content_id
df_transf_1 = df_transf_1.withColumn(
"products_basket",
get_content_id(df_transf_1.event_data)
)
df_transf_1 = df_transf_1.withColumn(
"product_id",
explode(df_transf_1.products_basket)
)
cannot resolve 'explode(
products_basket
)' due to data type mismatch: input to function explode should be array or map type, not StringType;
I know the reason, it's because of the different types that the field af_content_id
may contain, but I don't know how to resolve it. Using pyspark.sql.functions.array()
directly on the column doesn't work because it become array of array and explode will not produce the expected result.
A sample code to reproduce the step that I'm stuck on:
import pandas as pd
arr = [
['b5ad805c-f295-4852-82fc-961a88',12732936],
['0FD6955D-484C-4FC8-8C3F-DA7D28',['Gklb38','123655']],
['0E3D17EA-BEEF-4931-8104','12909841'],
['CC2877D0-A15C-4C0A-AD65-762A35C1',[12645715, 12909837, 12909837]]
]
df = pd.DataFrame(arr, columns = ['user_id','products_basket'])
df = df[['user_id','products_basket']].astype(str)
df_transf_1 = spark.createDataFrame(df)
I'm looking for a way to convert products_basket to one only possible format: an Array so that when I apply explode
, it will contain one id per row.
Upvotes: 6
Views: 8535
Reputation: 43494
If you are starting with a DataFrame like:
df_transf_1.show(truncate=False)
#+--------------------------------+------------------------------+
#|user_id |products_basket |
#+--------------------------------+------------------------------+
#|b5ad805c-f295-4852-82fc-961a88 |12732936 |
#|0FD6955D-484C-4FC8-8C3F-DA7D28 |['Gklb38', '123655'] |
#|0E3D17EA-BEEF-4931-8104 |12909841 |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
#+--------------------------------+------------------------------+
where the products_basket
column is a StringType
:
df.printSchema()
#root
# |-- user_id: string (nullable = true)
# |-- products_basket: string (nullable = true)
You can't call explode
on products_basket
because it's not an array or map.
One workaround is to remove any leading/trailing square brackets and then split the string on ", "
(comma followed by a space). This will convert the string into an array of strings.
from pyspark.sql.functions import col, regexp_replace, split
df_transf_new= df_transf_1.withColumn(
"products_basket",
split(regexp_replace(col("products_basket"), r"(^\[)|(\]$)|(')", ""), ", ")
)
df_transf_new.show(truncate=False)
#+--------------------------------+------------------------------+
#|user_id |products_basket |
#+--------------------------------+------------------------------+
#|b5ad805c-f295-4852-82fc-961a88 |[12732936] |
#|0FD6955D-484C-4FC8-8C3F-DA7D28 |[Gklb38, 123655] |
#|0E3D17EA-BEEF-4931-8104 |[12909841] |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
#+--------------------------------+------------------------------+
The regular expression pattern matches any of the following:
(^\[)
: An opening square bracket at the start of the string(\]$)
: A closing square bracket at the end of the string(')
: Any single quote (because your strings are quoted)and replaces these with an empty string.
This assumes that your data does not contain any needed single quotes or square brackets inside the product_basket
.
After the split
, the schema of the new DataFrame is:
df_transf_new.printSchema()
#root
# |-- user_id: string (nullable = true)
# |-- products_basket: array (nullable = true)
# | |-- element: string (containsNull = true)
Now you can call explode
:
from pyspark.sql.functions import explode
df_transf_new.withColumn("product_id", explode("products_basket")).show(truncate=False)
#+--------------------------------+------------------------------+----------+
#|user_id |products_basket |product_id|
#+--------------------------------+------------------------------+----------+
#|b5ad805c-f295-4852-82fc-961a88 |[12732936] |12732936 |
#|0FD6955D-484C-4FC8-8C3F-DA7D28 |[Gklb38, 123655] |Gklb38 |
#|0FD6955D-484C-4FC8-8C3F-DA7D28 |[Gklb38, 123655] |123655 |
#|0E3D17EA-BEEF-4931-8104 |[12909841] |12909841 |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12645715 |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837 |
#|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837 |
#+--------------------------------+------------------------------+----------+
Upvotes: 5