Developer Rajinikanth
Developer Rajinikanth

Reputation: 354

Azure DataBricks Json Array data store into storage and iterate and insert into azure SQL using Azure Data Factory

We have requirements to load the JSON array into azure storage from databricks. after that the stored JSON array to read and write back to azure sql DB from ADF.

below my sample JSON produced by ADB, then how to convert into dataframe and write back to storage.

[{'Details': {'Input': {'id': '1', 'name': 'asdsdasd', 'a1': None, 'a2': None, 'c': None, 's': None, 'c1': None, 'z': None}, 'Output': '{"msg":"some error"}'}, 'Failure': '{"msg":"error"}', 's': 'f'}, {'Details': {'Input': {'id': '2', 'name': 'sadsadsad', 'a1': 'adsadsad', 'a2': 'sssssss', 'c': 'cccc', 's': 'test', 'c1': 'ind', 'z': '22222'}, 'Output': '{"s":"2"}'}, 'Failure': '', 's': 's'}]

above JSON needs to load proper format into storage like parquet or delta ..etc

then we have to read this data from ADF to load into SQL DB

sample structure and expected table details .

adf_log_id | adf_id | e_name | e_desc         | status  | failure_msg
----------------------------------------------------------------------
1          |  1     | pipenam| {input and output details JSON} | success | Failure

Upvotes: 0

Views: 278

Answers (1)

Pratik Lad
Pratik Lad

Reputation: 8382

To convert Json into dataframe and write back to storage account with parquet or delta format.

To convert Json into dataframe you need to create and give Json schema explicitly.

from pyspark.sql.types import *
schema = StructType([
    StructField("Details", StructType([
        StructField("Input", StringType(), nullable=True),
        StructField("Output", StringType(), nullable=True)
    ])),
    StructField("Failure", StringType(), nullable=True),
    StructField("s", StringType(), nullable=True)
])

newJson = [{'Details': {'Input': {'id': '1', 'name': 'asdsdasd', 'a1': None, 'a2': None, 'c': None, 's': None, 'c1': None, 'z': None}, 'Output': '{"msg":"some error"}'}, 'Failure': '{"msg":"error"}', 's': 'f'}, {'Details': {'Input': {'id': '2', 'name': 'sadsadsad', 'a1': 'adsadsad', 'a2': 'sssssss', 'c': 'cccc', 's': 'test', 'c1': 'ind', 'z': '22222'}, 'Output': '{"s":"2"}'}, 'Failure': '', 's': 's'}]

df=spark.createDataFrame(data=newJson,schema=schema)
df.show(truncate=False)

Output: enter image description here

to load this dataframe in proper format into storage like parquet use below code:

storage_account_name = "xxxxxx"
storage_account_access_key = "access key"
spark.conf.set("fs.azure.account.key."+storage_account_name+".blob.core.windows.net",storage_account_access_key)
output_path = "wasbs://[email protected]/dataframe.parquet"

# Write the DataFrame to Azure Storage as Parquet
df.write.parquet(output_path, mode="overwrite")

After this read the parquet file from storage account and write it into sql database using ADF.

Upvotes: 0

Related Questions