Developer Rajinikanth
Developer Rajinikanth

Reputation: 354

Azure Data Factory Parquet File Read non-primitive issues

I am trying to read the parquet file in azure data factory but we are getting non-primitive issues when we try to load dataset as well as copy activity. kindly help us.

Before using ADF ,the data written by databricks notebook to azure storage location, after the ADF try to load the parquet file.

enter image description here enter image description here

Databricks inputs as below format:

[{'Details': {'Input': {'id': '1', 'name': 'asdsdasd', 'a1': None, 'a2': None, 'c': None, 's': None, 'c1': None, 'z': None}, 'Output': '{"msg":"some error"}'}, 'Failure': '{"msg":"error"}', 's': 'f'}, {'Details': {'Input': {'id': '2', 'name': 'sadsadsad', 'a1': 'adsadsad', 'a2': 'sssssss', 'c': 'cccc', 's': 'test', 'c1': 'ind', 'z': '22222'}, 'Output': '{"s":"2"}'}, 'Failure': '', 's': 's'}]

from pyspark.sql.functions import to_json,col
schema = StructType([
    StructField("id", StringType(), nullable=False),
    StructField("name", StringType(), nullable=True),
    StructField("desc", StringType(), nullable=True),
    StructField("Details", StructType([
        StructField("Input", StringType(), nullable=True),
        StructField("Output", StringType(), nullable=True)
    ])),
    StructField("Failure", StringType(), nullable=True),
    StructField("s", StringType(), nullable=True)
])

newJson = [{'Details': {'Input': {'id': '1', 'name': 'asdsdasd', 'a1': None, 'a2': None, 'c': None, 's': None, 'c1': None, 'z': None}, 'Output': '{"msg":"some error"}'}, 'Failure': '{"msg":"error"}', 's': 'f'}, {'Details': {'Input': {'id': '2', 'name': 'sadsadsad', 'a1': 'adsadsad', 'a2': 'sssssss', 'c': 'cccc', 's': 'test', 'c1': 'ind', 'z': '22222'}, 'Output': '{"s":"2"}'}, 'Failure': '', 's': 's'}]

df=spark.createDataFrame(data=newJson,schema=schema)
display(df)
df.coalesce(1).write.parquet(f"{adls_url}/test/", mode="overwrite")

after written the json format data into parquet table, we are getting issues while read the file in ADF

Upvotes: 0

Views: 534

Answers (2)

Hick
Hick

Reputation: 459

If the error is for the txn, you might be reading all files recursively because databricks delta parquet creates a folder with logs and such, within this folder there are additional tables with more columns.

If so, remove the recursively option from the source of the copy data activity:

enter image description here

Upvotes: 0

Bhavani
Bhavani

Reputation: 5317

When I tried to replicate the issue, I got the same error in my environment:

enter image description here

As per this, below are the data type mappings for Parquet files:

enter image description here

In your data, the details column is taking the charArray data type, which is non-primitive and not supported for Parquet files. So, you can rebuild your Parquet file with primitive data types of schema using the following code:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("PrimitiveSchemaDataFrame").getOrCreate()

# Define the schema with primitive data types
primitive_schema = StructType([
    StructField("id", StringType(), nullable=False),
    StructField("name", StringType(), nullable=True),
    StructField("desc", StringType(), nullable=True),
    StructField("details_input_id", StringType(), nullable=True),
    StructField("details_input_name", StringType(), nullable=True),
    StructField("details_output", StringType(), nullable=True),
    StructField("failure", StringType(), nullable=True),
    StructField("s", StringType(), nullable=True)
])

# Define the JSON data as a list of dictionaries
json_data = [
    {
        'id': '1',
        'name': 'asdsdasd',
        'desc': None,
        'details_input_id': '1',
        'details_input_name': 'asdsdasd',
        'details_output': '{"msg":"some error"}',
        'failure': '{"msg":"error"}',
        's': 'f'
    },
    {
        'id': '2',
        'name': 'sadsadsad',
        'desc': None,
        'details_input_id': '2',
        'details_input_name': 'sadsadsad',
        'details_output': '{"s":"2"}',
        'failure': '',
        's': 's'
    }
]

# Create the DataFrame with the schema and JSON data
df = spark.createDataFrame(json_data, schema=primitive_schema)

df.coalesce(1).write.parquet("abfss://<containerName>@<storageAccount>.dfs.core.windows.net/parq/", mode="overwrite")

You will get the Parquet file as shown below:

enter image description here

Try to import the schema with the above Parquet file as a source dataset, and you will be able to import it as shown below:

enter image description here

Upvotes: 0

Related Questions