Canovice
Canovice

Reputation: 10441

mongoimport JSON from Google Cloud Storage in an Airflow task

It seems that moving data from GCS to MongoDB is not common, since there is not very much documentation on this. We have the following task that we we pass as the python_callable to a Python operator - this task moves data from BigQuery into GCS as JSON:

def transfer_gcs_to_mongodb(table_name):
    # connect
    client = bigquery.Client()
    bucket_name = "our-gcs-bucket"
    project_id = "ourproject"
    dataset_id = "ourdataset"
        
    destination_uri = f'gs://{bucket_name}/{table_name}.json'
    dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
    table_ref = dataset_ref.table(table_name)

    configuration = bigquery.job.ExtractJobConfig()
    configuration.destination_format = 'NEWLINE_DELIMITED_JSON'

    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        job_config=configuration,
        location="US",
    )  # API request
    extract_job.result()  # Waits for job to complete.

    print("Exported {}:{}.{} to {}".format(project_id, dataset_id, table_name, destination_uri))

This task is successfully getting data into GCS. However, we are stuck now when it comes to how to run mongoimport correctly, to get this data into MongoDB. In particular, it seems like mongoimport cannot point to the file in GCS, but rather it has to be downloaded locally first, and then imported into MongoDB.

How should this be done in Airflow? Should we write a shell script that downloads the JSON from GCS, and then runs mongoimport with the correct uri and all the correct flags? Or is there another way to run mongoimport in Airflow that we are missing?

Upvotes: 1

Views: 1124

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15979

You don't need to write shell script to download from GCS. You can simply use the GCSToLocalFilesystemOperator then you can open the file and write it to mongo using the insert_many function of the MongoHook.

I didn't test it but it should be something like:

mongo = MongoHook(conn_id=mongo_conn_id)
with open('file.json') as f:
    file_data = json.load(f)
mongo.insert_many(file_data)

This is for a pipe of: BigQuery -> GCS -> Local File System -> MongoDB.

You can also do it in memory as: BigQuery -> GCS -> MongoDB if you prefer.

Upvotes: 2

Related Questions