Reputation: 354
We have requirements to load the JSON array into azure storage from databricks. after that the stored JSON array to read and write back to azure sql DB from ADF.
below my sample JSON produced by ADB, then how to convert into dataframe and write back to storage.
[{'Details': {'Input': {'id': '1', 'name': 'asdsdasd', 'a1': None, 'a2': None, 'c': None, 's': None, 'c1': None, 'z': None}, 'Output': '{"msg":"some error"}'}, 'Failure': '{"msg":"error"}', 's': 'f'}, {'Details': {'Input': {'id': '2', 'name': 'sadsadsad', 'a1': 'adsadsad', 'a2': 'sssssss', 'c': 'cccc', 's': 'test', 'c1': 'ind', 'z': '22222'}, 'Output': '{"s":"2"}'}, 'Failure': '', 's': 's'}]
above JSON needs to load proper format into storage like parquet or delta ..etc
then we have to read this data from ADF to load into SQL DB
sample structure and expected table details .
adf_log_id | adf_id | e_name | e_desc | status | failure_msg
----------------------------------------------------------------------
1 | 1 | pipenam| {input and output details JSON} | success | Failure
Upvotes: 0
Views: 278
Reputation: 8382
To convert Json into dataframe and write back to storage account with parquet or delta format.
To convert Json into dataframe you need to create and give Json schema explicitly.
from pyspark.sql.types import *
schema = StructType([
StructField("Details", StructType([
StructField("Input", StringType(), nullable=True),
StructField("Output", StringType(), nullable=True)
])),
StructField("Failure", StringType(), nullable=True),
StructField("s", StringType(), nullable=True)
])
newJson = [{'Details': {'Input': {'id': '1', 'name': 'asdsdasd', 'a1': None, 'a2': None, 'c': None, 's': None, 'c1': None, 'z': None}, 'Output': '{"msg":"some error"}'}, 'Failure': '{"msg":"error"}', 's': 'f'}, {'Details': {'Input': {'id': '2', 'name': 'sadsadsad', 'a1': 'adsadsad', 'a2': 'sssssss', 'c': 'cccc', 's': 'test', 'c1': 'ind', 'z': '22222'}, 'Output': '{"s":"2"}'}, 'Failure': '', 's': 's'}]
df=spark.createDataFrame(data=newJson,schema=schema)
df.show(truncate=False)
Output:
to load this dataframe in proper format into storage like parquet use below code:
storage_account_name = "xxxxxx"
storage_account_access_key = "access key"
spark.conf.set("fs.azure.account.key."+storage_account_name+".blob.core.windows.net",storage_account_access_key)
output_path = "wasbs://[email protected]/dataframe.parquet"
# Write the DataFrame to Azure Storage as Parquet
df.write.parquet(output_path, mode="overwrite")
After this read the parquet file from storage account and write it into sql database using ADF.
Upvotes: 0