Reputation: 341
I am trying to get keys and values of column which has some rows as json and others as string/None. I want to get each json key and it's value stacked into new different columns respectively.
Json can be nested type and I don't know the schema of json rows beforehand (like how many keys are/ how nested it is)
.
Data set has 100s millions of rows
.
Sample data:
+-----------+----------+-------------------------------------------------------+
|col1 |col2 | col3 | col4 |
+-----+-----+----------+--------------------------------------------------------
|1 |A | hello1 | time1 |
|2 |B | hello2 | None
|3 |C | hello3 | {'world1': 'how are you?','world2':{'help me!':'please'}}
|4 |D | hello4 | {'world3':'ola!'} |
+-----+-----+----------+-------------------------------------------------------+
Expected dataframe:
+--------+-------------------+-------------------------------+
|col1 |col2 | col3 | new_col_keys | new_col_values |
+--------+-------------------+----------+--------------------+
|1 |A | hello1 | time1 Null |
|2 |B | hello2 | Null Null |
|3 |C | hello3 | world1 how are you? |
|3 |C | hello3 | world2 {'help me!':'please'}|
|4 |D | hello4 | world3 ola! |
+-----+-----+----------+-------------------------------------+
Here I am adding new columns for keys and values differently and deleting the original one.
NOTE: All columns in Sample data are of StringType
Upvotes: 0
Views: 1122
Reputation: 10035
You may try the following :
pyspark.sql
LIKE
) not (~
) like as json dictionary/map type values have the pattern {key:value}
and store in a dataframe df_simple
LIKE
and store results in a dataframe df_map
.MapType
with string keys and string values using from_json
select
to retrieve desired columns and explode
to get each key/value pair from the transformed MapType
column col4
, into a new rowkey
->new_col_keys
and value
->new_col_values
) using withColumnRenamed
df_simple
and df_map
into final data set output_df
(While using the select
in the union is optional, it ensures that the rigth columns are used even if the dataframe changes in the future)NB. Sections related to steps in code are included as comments
# Step 1
from pyspark.sql import functions as F
from pyspark.sql import types as T
df_simple = df.where(~ F.col("col4").like("{%}")) #Step 2
df_simple.show() # only for debugging purposes
+----+----+------+-----+
|col1|col2| col3| col4|
+----+----+------+-----+
| 1| A|hello1|time1|
| 2| B|hello2| null|
+----+----+------+-----+
df_maps = (
# Step 3
df.where(F.col("col4").like("{%}"))
# Step 4
.withColumn("col4",F.from_json(
F.col("col4"),
T.MapType(T.StringType(),T.StringType())
))
# Step 5
.select(
F.col("col1"),
F.col("col2"),
F.col("col3"),
F.explode("col4")
)
# Step 6
.withColumnRenamed("key","new_col_keys")
.withColumnRenamed("value","new_col_values")
)
df_maps.show(truncate=False) # only for debugging purposes
+----+----+------+------------+---------------------+
|col1|col2|col3 |new_col_keys|new_col_values |
+----+----+------+------------+---------------------+
|3 |C |hello3|world1 |how are you? |
|3 |C |hello3|world2 |{"help me!":"please"}|
|4 |D |hello4|world3 |ola! |
+----+----+------+------------+---------------------+
# Step 7
output_df = (
df_simple.selectExpr("col1","col2","col3","col4 as new_col_keys","NULL as new_col_values")
.union(
df_maps.select("col1","col2","col3","new_col_keys","new_col_values")
)
)
output_df.printSchema() # only for debugging
output_df.show(truncate=False) # only for debugging
root
|-- col1: string (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
|-- new_col_keys: string (nullable = true)
|-- new_col_values: string (nullable = true)
+----+----+------+------------+---------------------+
|col1|col2|col3 |new_col_keys|new_col_values |
+----+----+------+------------+---------------------+
|1 |A |hello1|time1 |null |
|2 |B |hello2|null |null |
|3 |C |hello3|world1 |how are you? |
|3 |C |hello3|world2 |{"help me!":"please"}|
|4 |D |hello4|world3 |ola! |
+----+----+------+------------+---------------------+
NB. If you would like to repeat this for nested columns. You could repeat the operations from Steps 3 - 7. However, this time instead of using col4
, you would use new_col_keys
since col4
would no longer exist in the transformed dataset.
Let me know if this works for you.
Upvotes: 1