Reputation: 21
I'm trying to load the json data from the API source using Azure Data Factory. I'm getting error below whcih is bad json escape sequence.
ErrorCode=JsonInvalidDataFormat,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=Error occurred when deserializing source JSON file 'api/vc/report/16277292d17ac20a5d0d073d0f8fa45bf848a89e2e0c9fcebb707f187fcf9cdce88ceb70c'. Check if the data is in valid JSON object format.,Source=Microsoft.DataTransfer.Common,''Type=Newtonsoft.Json.JsonReaderException,Message=Bad JSON escape sequence: \A. Path 'Data[43999].work_order', line 1, position 39435083.,Source=Newtonsoft.Json,'
Above error I got when the destination dataset has the encoding of UTF-8 WITHOUT BOM.
If I set the encoding as UTF-8 in dest DS in ADF. data is loading successfully to ADLS as json file, but I'm unable to read/parse (tried reading in text and json formats) the json file in databricks/dataflow. getting error like (AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default.....).
Is there any way to replace the invalid escape sequence while reading from source in ADF? Or Any other solution to handle this situation?
this is the sample data
{
"Report ID": "123",
"Report Name": "JSON_1",
"Data as of Date": "2024-01-18 01:28 PM",
"Report Date": "01/18/2024 08:27 AM",
"Report Header": "",
"Report Footer": "",
"Data": [
{
"ID": "01",
"Supplier": "COMPANY",
"Type": "Standard",
"Work_ID": "fg",**"work_order":"T\A Harvey"**,
"Purchase_Order_Number": "2",
"Statement_of_Work_Status": "Approved",
"Primary Cost Object": "3 | BASE",
"Primary Cost Object Code": "10014",
"Item": "Limit ",
"Item_ID": "rg23",
"Item_Status": "R",
"S_Worker_ID": "",
"Worker_ID": "",
"Worker_Name": "",
"Master_Number": "",
"Role_Code": "",
"Role_Level": "",
"Invoice_ID": "",
"Item_Submit_Date": "12/09/2023 12:39 PM"....15 more columns
}
]
}
Upvotes: 0
Views: 237
Reputation: 3215
I tried the below approach using the databricks to replace escape characters before using it as source in ADF
As you mentioned that you can't change the data at the source side. And have to handle that loaded Json data(UTF-8) and need to replace the escape character, and parse and store as a csv data.
You can follow the below steps:
Step 1: Load the JSON data into a DataFrame in Azure Databricks.
json_data = '''
{
"Report ID": "123",
"Report Name": "JSON_1",
"Data as of Date": "2024-01-18 01:28 PM",
"Report Date": "01/18/2024 08:27 AM",
"Report Header": "",
"Report Footer": "",
"Data": [
{
"ID": "01",
"Supplier": "COMPANY",
"Type": "Standard",
"Work_ID": "fg",
"work_order": "T\\A Harvey",
"Purchase_Order_Number": "2",
"Statement_of_Work_Status": "Approved",
"Primary Cost Object": "3 | BASE",
"Primary Cost Object Code": "10014",
"Item": "Limit ",
"Item_ID": "rg23",
"Item_Status": "R",
"S_Worker_ID": "",
"Worker_ID": "",
"Worker_Name": "",
"Master_Number": "",
"Role_Code": "",
"Role_Level": "",
"Invoice_ID": "",
"Item_Submit_Date": "12/09/2023 12:39 PM"
}
]
}
'''
Step 2: Replace the escape characters in the DataFrame.
json_data = json_data.replace('\\', '')
schema = StructType([StructField("Report ID", StringType(), True),
StructField("Report Name", StringType(), True),
StructField("Data as of Date", StringType(), True),
StructField("Report Date", StringType(), True),
StructField("Report Header", StringType(), True),
StructField("Report Footer", StringType(), True),
StructField("Data", StringType(), True)])
df = spark.read.json(spark.sparkContext.parallelize([json_data]), schema=schema)
file_path = "abfss://[email protected]/exmpl.json"
Step 3: Write the DataFrame to CSV format and store it in ADLS.
df.write.mode("overwrite").json(file_path)
Results:
Report ID Report Name Data as of Date Report Date Report Header Report Footer Data
123 JSON_1 2024-01-18 01:28 PM 01/18/2024 08:27 AM [{"ID":"01","Supplier":"COMPANY","Type":"Standard","Work_ID":"fg","work_order":"TA Harvey","Purchase_Order_Number":"2","Statement_of_Work_Status":"Approved","Primary Cost Object":"3 | BASE","Primary Cost Object Code":"10014","Item":"Limit ","Item_ID":"rg23","Item_Status":"R","S_Worker_ID":"","Worker_ID":"","Worker_Name":"","Master_Number":"","Role_Code":"","Role_Level":"","Invoice_ID":"","Item_Submit_Date":"12/09/2023 12:39 PM"}]
The above code is replacing escape characters from the JSON data. and converting JSON string to dataframe and writing to ADLS.
However, Azure databricks has the capabilty to load json data in python request package.
The below is the example:
import requests
resp = requests.get('https://reqres.in/api/users?page=1,name,href')
db1 = spark.sparkContext.parallelize([resp.text])
df2 = spark.read.json(db1)
requests
module, which allows Python code to send HTTP requests and It retrieves data from the API endpoint
Spark RDD (Resilient Distributed Dataset) parallelizing the text
spark.read.json
reads from the Spark RDD
Upvotes: 0