Reputation: 225
I have data as below
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|asset_id |chg_log |
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|4455628986|[{'oldValues': [], 'newValues': ['COMPO_MAIL'], 'fieldName': 'Communication Type', 'fieldType': 'TEXT', 'lookupInfo': {'fieldType': 'TEXT'}, 'additional': {'GROUP_QUALIFIER': [{'qualifiers': [{'lookupKey': 'Communication_Type', 'value': 'COMPO_MAIL'}]}]}}, {'oldValues': [], 'lookupInfo': {'isClientLevel': False, 'fieldType': 'DATE'}, 'fieldName': 'Delivery Due Date', 'fieldType': 'DATE', 'newValues': ['1601771520000']}, {'oldValues': [], 'lookupInfo': {'lookupType': 'CUST_ID', 'fieldType': 'CUST_ID'}, 'fieldName': 'Customer Id', 'fieldType': 'CUST_ID', 'newValues': ['10486']}, {'oldValues': [], 'lookupInfo': {'isClientLevel': False, 'fieldType': 'DROPDOWN'}, 'fieldName': 'Process_Status', 'fieldType': 'PICKLIST', 'newValues': ['Request Review']}] |
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
From the Json string of the column chg_log I'd like to extract the value of the fieldName - Process_Status and it's newValues as Request Review. Expected result as below.
+----------+---------------+
|asset_id |Process_Status |
+----------+---------------+
|4455628986|Request Review |
+----------+---------------+
The json is not in the same order every time. Sometimes Process_Status comes first in the json string and then Communication Type and so on...
I tried using function json_extract but couldn't get it.
How can I achieve this in spark Sql or Pyspark or Hive. Can someone help me??
Thanks in advance
Upvotes: 2
Views: 1675
Reputation: 38335
In Hive there is very limited support for JSON processing: get_json_object returns string even if it is JSON array or map, also JSONPath filtering does not work. This is why you need to extract, split and explode and filter everything in the query.
For example this get_json_object(chg_log,"$.[].fieldName")
returns:
["Communication Type","Delivery Due Date","Customer Id","Process_Status"]
Looks like array, but it is a STRING, you need to split it to get array.
Also your JSON is not valid one. Instead of single quotes it should be double-quotes and False
shoud be lowercased: "isClientLevel": false
, not 'isClientLevel': False
Demo with your example (See comments in the code):
with mytable as(
select 4455628986 asset_id , "[{'oldValues': [], 'newValues': ['COMPO_MAIL'], 'fieldName': 'Communication Type', 'fieldType': 'TEXT', 'lookupInfo': {'fieldType': 'TEXT'}, 'additional': {'GROUP_QUALIFIER': [{'qualifiers': [{'lookupKey': 'Communication_Type', 'value': 'COMPO_MAIL'}]}]}}, {'oldValues': [], 'lookupInfo': {'isClientLevel': False, 'fieldType': 'DATE'}, 'fieldName': 'Delivery Due Date', 'fieldType': 'DATE', 'newValues': ['1601771520000']}, {'oldValues': [], 'lookupInfo': {'lookupType': 'CUST_ID', 'fieldType': 'CUST_ID'}, 'fieldName': 'Customer Id', 'fieldType': 'CUST_ID', 'newValues': ['10486']}, {'oldValues': [], 'lookupInfo': {'isClientLevel': False, 'fieldType': 'DROPDOWN'}, 'fieldName': 'Process_Status', 'fieldType': 'PICKLIST', 'newValues': ['Request Review']}]" chg_log
)
select s.asset_id, split(s.newValues,'","')[f.pos] as Process_Status
from
(
select asset_id,
--extract array of fieldName and newValues
--returned as string, not array
--this is why we need to split and explode it later
--remove [" and "]
regexp_replace(get_json_object(chg_log,"$.[].fieldName"),'^\\["|"\\]$','') fieldName,
regexp_replace(get_json_object(chg_log,"$.[].newValues"),'^\\["|"\\]$','') newValues
from
(
--Fix invalid JSON
--replace single-quotes with double-quotes, convert False to false, etc, add more fixes if necessary here
select asset_id, regexp_replace(regexp_replace(regexp_replace(chg_log, "'",'"'),'False','false'),'True','true') chg_log from mytable
)s
)s lateral view outer posexplode(split(fieldName,'","'))f as pos, field_name
where f.field_name='Process_Status' --filter
Result:
asset_id process_status
4455628986 Request Review
Upvotes: 1
Reputation: 32710
Your column chg_log
seems to be a stringified Python dict rather than a valid JSON string.
In Pyspark, you can use UDF to transform the dict into json then convert it to an array of structs using from_json
and finally filter the array to find the field Process_Status
:
import ast
from pyspark.sql import functions as F
dict_to_json = F.udf(lambda x: json.dumps(ast.literal_eval(x)))
df = df.withColumn("chg_log", dict_to_json(F.col("chg_log")))
df1 = df.withColumn(
"chg_log",
F.from_json("chg_log", F.schema_of_json(df.select("chg_log").head()[0]))
).withColumn(
"chg_log",
F.expr("filter(chg_log, x -> x.fieldName = 'Process_Status')")[0]
).select(
F.col("asset_id"), F.col("chg_log.newValues").alias("Process_Status")
)
df1.show()
# +----------+----------------+
# | asset_id| Process_Status|
# +----------+----------------+
# |4455628986|[Request Review]|
# +----------+----------------+
Another way by doing the lookup directly in the UDF:
parse_status = F.udf(
lambda x: next(i["newValues"] for i in ast.literal_eval(x) if i["fieldName"] == "Process_Status"),
ArrayType(StringType())
)
df1 = df.select(F.col("asset_id"), parse_status(F.col("chg_log")).alias("Process_Status"))
Upvotes: 1