JohnM
JohnM

Reputation: 5

Airflow: How to Load an XML File to BigQuery?

I am trying to write an Airflow DAG, which will load an .XML file to BigQuery using a Python method, but I believe that it needs to be converted into .json for it to work.

I have written the following code to convert an .XML file to .json, but I am unsure if this is correct, and how I can write a Python method to achieve this:

import json, xmltodict

def load_xml_to_bq():
    
with open("xml_file.xml") as xml_file:
    data_dict = xmltodict.parse(xml_file.read())
    xml_file.close()
json_data = json.dumps(data_dict)
with open("data.json", "w") as json_file:
        json_file.write(json_data)
        json_file.close()

Additionally, does the .XML file need to be loaded from a GCS Bucket for it to be inserted into BigQuery?

Furthermore, would the following code need to be added to my function for the .XML to be inserted into BigQuery?

    client = bigquery.Client()
    client.insert_rows_json(f'{dataset}.{table}', dec_sale_list)

Thanks - any help on how to accomplish this would be helpful; I feel that I have some of the correct concepts, but I am not sure what I need to add/remove to do this.

Upvotes: 0

Views: 1248

Answers (2)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

You can also use the 2 following solutions.

Solution 1 :

  • PythonOperator to transform your xml file to json with pure Python code and a lib called xmltodict
  • Then use GCSToBigqueryOperator from the Json file to Bigquery
dags_folder = os.getenv("DAGS_FOLDER")
xml_file_path_gcs = f'{dags_folder}/your_xml_file.xml'

def transform_xml_file_to_json():
    import json, xmltodict
    from google.cloud import storage

    with open(xml_file_path_gcs) as xml_file:
        data_dict = xmltodict.parse(xml_file.read())
        xml_file.close()

    json_data = json.dumps(data_dict)

    client = storage.Client()
    bucket = client.get_bucket("your-bucket")
    blob = bucket.blob('your_file.json')
    blob.upload_from_string(data=json.dumps(json_data), content_type='application/json')

with airflow.DAG(
        "dag",
        default_args=default_dag_args,
        schedule_interval=None) as dag:

    xml_to_json = PythonOperator(
        task_id="transform_xml_to_json",
        python_callable=transform_xml_file_to_json
    )

    gcs_to_bq = GCSToBigQueryOperator(
       task_id="gcs_to_bq",
       bucket="your-bucket",
       source_objects=['*.json'],
       destination_project_dataset_table="your_table",
       schema_object=f"dags/schema/creditsafe/{data_type}.json",
       source_format="NEWLINE_DELIMITED_JSON",
    ....,
   )

   xml_to_json >> gcs_to_bq
  • The first operator transforms the xml to json file with the xmltodict lib and upload the json file to GCS with Python client
  • The second operator loads the json file to BigQuery

Solution 2 :

All the work in the PythonOperator :

  • Load the XML file
  • Transform it to list of Dicts
  • Use Python client and insert_rows method
dags_folder = os.getenv("DAGS_FOLDER")
xml_file_path_gcs = f'{dags_folder}/your_xml_file.xml'

def load_xml_file_to_bq():
    import json, xmltodict
    from google.cloud import bigquery

    with open(xml_file_path_gcs) as xml_file:
        data_dicts = xmltodict.parse(xml_file.read())
        xml_file.close()
    
    # Check if you have to transform data_dicts to the expected list of Dict. A Dict need to match exactly with the schema of the BQ table
    client = bigquery.Client()
    client.insert_rows_json('dataset.table', data_dicts)


with airflow.DAG(
        "dag",
        default_args=default_dag_args,
        schedule_interval=None) as dag:

    load_xml_file_to_bq = PythonOperator(
        task_id="load_xml_file_to_bq",
        python_callable=load_xml_file_to_bq
    )

    load_xml_file_to_bq

Check if you have to transform data_dicts to the expected list of Dict. A Dict in the list needs to match exactly with the schema of the BigQuery table.

You have to be carreful, if your xml file is too big, it's not recommended to load heavy elements in a node.

Upvotes: 1

Aswin A
Aswin A

Reputation: 99

You can use some online tools to verify the JSON output generated from xml looks ok(https://www.utilities-online.info/xmltojson)

for example below xml

<?xml version="1.0"?>
 <customers>
 <customer id="55000">
    <name>Charter Group</name>
    <address>
        <street>100 Main</street>
        <city>Framingham</city>
        <state>MA</state>
        <zip>01701</zip>
    </address>
    <address>
        <street>720 Prospect</street>
        <city>Framingham</city>
        <state>MA</state>
        <zip>01701</zip>
    </address>
    <address>
        <street>120 Ridge</street>
        <state>MA</state>
        <zip>01760</zip>
    </address>
</customer>
</customers>

will generate JSON in this format

{
"customers": {
    "customer": {
    "-id": "55000",
    "name": "Charter Group",
    "address": [
        {
        "street": "100 Main",
        "city": "Framingham",
        "state": "MA",
        "zip": "01701"
        },
        {
        "street": "720 Prospect",
        "city": "Framingham",
        "state": "MA",
        "zip": "01701"
        },
        {
        "street": "120 Ridge",
        "state": "MA",
        "zip": "01760"
        }
    ]
    }
}
}

There are multiple methods by which you can do the xml to Json conversion in Python. you can use xmltodict, xml.etree.ElementTree, minidom. The code performance will be different for different libraries. If it's a relatively small xml file then it's okay to use xmltodict. If the Xml file is too large then you need to consider running your Python Script from Compute engine as the conversion Process will be resource intensive. For this you can use Airflow Google Compute Engine Operators (https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/operator/gcp/compute.html) to Start,Stop your GCE instance.

You can specify the xml file path (if the file is present in GCS then the path becomes GCS Path where the file is present) in the Python Script. If you are using Airflow then you can use GoogleCloudStorageToBigQueryOperator (https://airflow.readthedocs.io/en/1.10.7/howto/operator/gcp/gcs.html) to load the generated Json to BigQuery

To summarise

  • Load file from GCS into python Script
  • Dump the generated JSON file to GCS or upload the generated Json data (data in Python variable) from with the Python utility Script directly to BigQuery using the BQ Python API's available
  • If you are saving the generated JSON file to GCS the use GoogleCloudStorageToBigQueryOperator to load the data into BigQuery

I will share a Python snippet that will work (Basic Python Script for xml to JSON conversion)

    import json
    import xmltodict
    
    with open("xml_file.xml") as xml_file:
    
    data_dict = xmltodict.parse(xml_file.read())
    json_data = json.dumps(data_dict)
    
    with open("data.json", "w") as json_file:
        json_file.write(json_data)
        json_file.close()

enter image description here

Upvotes: 0

Related Questions