Reputation: 10441
It seems that moving data from GCS to MongoDB is not common, since there is not very much documentation on this. We have the following task that we we pass as the python_callable
to a Python operator - this task moves data from BigQuery into GCS as JSON:
def transfer_gcs_to_mongodb(table_name):
# connect
client = bigquery.Client()
bucket_name = "our-gcs-bucket"
project_id = "ourproject"
dataset_id = "ourdataset"
destination_uri = f'gs://{bucket_name}/{table_name}.json'
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
table_ref = dataset_ref.table(table_name)
configuration = bigquery.job.ExtractJobConfig()
configuration.destination_format = 'NEWLINE_DELIMITED_JSON'
extract_job = client.extract_table(
table_ref,
destination_uri,
job_config=configuration,
location="US",
) # API request
extract_job.result() # Waits for job to complete.
print("Exported {}:{}.{} to {}".format(project_id, dataset_id, table_name, destination_uri))
This task is successfully getting data into GCS. However, we are stuck now when it comes to how to run mongoimport
correctly, to get this data into MongoDB. In particular, it seems like mongoimport
cannot point to the file in GCS, but rather it has to be downloaded locally first, and then imported into MongoDB.
How should this be done in Airflow? Should we write a shell script that downloads the JSON from GCS, and then runs mongoimport
with the correct uri
and all the correct flags? Or is there another way to run mongoimport
in Airflow that we are missing?
Upvotes: 1
Views: 1124
Reputation: 15979
You don't need to write shell script to download from GCS. You can simply use the GCSToLocalFilesystemOperator then you can open the file and write it to mongo using the insert_many function of the MongoHook.
I didn't test it but it should be something like:
mongo = MongoHook(conn_id=mongo_conn_id)
with open('file.json') as f:
file_data = json.load(f)
mongo.insert_many(file_data)
This is for a pipe of: BigQuery -> GCS -> Local File System -> MongoDB.
You can also do it in memory as: BigQuery -> GCS -> MongoDB if you prefer.
Upvotes: 2