Reputation: 5
I am trying to write an Airflow DAG, which will load an .XML
file to BigQuery using a Python method, but I believe that it needs to be converted into .json
for it to work.
I have written the following code to convert an .XML
file to .json
, but I am unsure if this is correct, and how I can write a Python method to achieve this:
import json, xmltodict
def load_xml_to_bq():
with open("xml_file.xml") as xml_file:
data_dict = xmltodict.parse(xml_file.read())
xml_file.close()
json_data = json.dumps(data_dict)
with open("data.json", "w") as json_file:
json_file.write(json_data)
json_file.close()
Additionally, does the .XML
file need to be loaded from a GCS Bucket for it to be inserted into BigQuery?
Furthermore, would the following code need to be added to my function for the .XML
to be inserted into BigQuery?
client = bigquery.Client()
client.insert_rows_json(f'{dataset}.{table}', dec_sale_list)
Thanks - any help on how to accomplish this would be helpful; I feel that I have some of the correct concepts, but I am not sure what I need to add/remove to do this.
Upvotes: 0
Views: 1248
Reputation: 6572
You can also use the 2 following solutions.
Solution 1 :
PythonOperator
to transform your xml file to json with pure Python
code and a lib called xmltodict
GCSToBigqueryOperator
from the Json
file to Bigquery
dags_folder = os.getenv("DAGS_FOLDER")
xml_file_path_gcs = f'{dags_folder}/your_xml_file.xml'
def transform_xml_file_to_json():
import json, xmltodict
from google.cloud import storage
with open(xml_file_path_gcs) as xml_file:
data_dict = xmltodict.parse(xml_file.read())
xml_file.close()
json_data = json.dumps(data_dict)
client = storage.Client()
bucket = client.get_bucket("your-bucket")
blob = bucket.blob('your_file.json')
blob.upload_from_string(data=json.dumps(json_data), content_type='application/json')
with airflow.DAG(
"dag",
default_args=default_dag_args,
schedule_interval=None) as dag:
xml_to_json = PythonOperator(
task_id="transform_xml_to_json",
python_callable=transform_xml_file_to_json
)
gcs_to_bq = GCSToBigQueryOperator(
task_id="gcs_to_bq",
bucket="your-bucket",
source_objects=['*.json'],
destination_project_dataset_table="your_table",
schema_object=f"dags/schema/creditsafe/{data_type}.json",
source_format="NEWLINE_DELIMITED_JSON",
....,
)
xml_to_json >> gcs_to_bq
GCS
with Python clientBigQuery
Solution 2 :
All the work in the PythonOperator
:
Dicts
Python
client and insert_rows
methoddags_folder = os.getenv("DAGS_FOLDER")
xml_file_path_gcs = f'{dags_folder}/your_xml_file.xml'
def load_xml_file_to_bq():
import json, xmltodict
from google.cloud import bigquery
with open(xml_file_path_gcs) as xml_file:
data_dicts = xmltodict.parse(xml_file.read())
xml_file.close()
# Check if you have to transform data_dicts to the expected list of Dict. A Dict need to match exactly with the schema of the BQ table
client = bigquery.Client()
client.insert_rows_json('dataset.table', data_dicts)
with airflow.DAG(
"dag",
default_args=default_dag_args,
schedule_interval=None) as dag:
load_xml_file_to_bq = PythonOperator(
task_id="load_xml_file_to_bq",
python_callable=load_xml_file_to_bq
)
load_xml_file_to_bq
Check if you have to transform data_dicts
to the expected list of Dict
. A Dict
in the list needs to match exactly with the schema of the BigQuery
table.
You have to be carreful, if your xml file is too big, it's not recommended to load heavy elements in a node.
Upvotes: 1
Reputation: 99
You can use some online tools to verify the JSON output generated from xml looks ok(https://www.utilities-online.info/xmltojson)
for example below xml
<?xml version="1.0"?>
<customers>
<customer id="55000">
<name>Charter Group</name>
<address>
<street>100 Main</street>
<city>Framingham</city>
<state>MA</state>
<zip>01701</zip>
</address>
<address>
<street>720 Prospect</street>
<city>Framingham</city>
<state>MA</state>
<zip>01701</zip>
</address>
<address>
<street>120 Ridge</street>
<state>MA</state>
<zip>01760</zip>
</address>
</customer>
</customers>
will generate JSON in this format
{
"customers": {
"customer": {
"-id": "55000",
"name": "Charter Group",
"address": [
{
"street": "100 Main",
"city": "Framingham",
"state": "MA",
"zip": "01701"
},
{
"street": "720 Prospect",
"city": "Framingham",
"state": "MA",
"zip": "01701"
},
{
"street": "120 Ridge",
"state": "MA",
"zip": "01760"
}
]
}
}
}
There are multiple methods by which you can do the xml to Json conversion in Python. you can use xmltodict, xml.etree.ElementTree, minidom. The code performance will be different for different libraries. If it's a relatively small xml file then it's okay to use xmltodict. If the Xml file is too large then you need to consider running your Python Script from Compute engine as the conversion Process will be resource intensive. For this you can use Airflow Google Compute Engine Operators (https://airflow.apache.org/docs/apache-airflow/1.10.14/howto/operator/gcp/compute.html) to Start,Stop your GCE instance.
You can specify the xml file path (if the file is present in GCS then the path becomes GCS Path where the file is present) in the Python Script. If you are using Airflow then you can use GoogleCloudStorageToBigQueryOperator (https://airflow.readthedocs.io/en/1.10.7/howto/operator/gcp/gcs.html) to load the generated Json to BigQuery
To summarise
I will share a Python snippet that will work (Basic Python Script for xml to JSON conversion)
import json
import xmltodict
with open("xml_file.xml") as xml_file:
data_dict = xmltodict.parse(xml_file.read())
json_data = json.dumps(data_dict)
with open("data.json", "w") as json_file:
json_file.write(json_data)
json_file.close()
Upvotes: 0