sdave
sdave

Reputation: 568

Loading multiple file from cloud storage to big query in different tables

I am new to GCP, I am able to get 1 file into GCS from my VM and then transfer it to bigquery. How to I transfer multiple files from GCS to Bigquery. I know wildcard URi is the solution to it but what other changes are also needed in the code below?

def hello_gcs(event, context):
    from google.cloud import bigquery
    # Construct a BigQuery client object.
    client = bigquery.Client()

    # TODO(developer): Set table_id to the ID of the table to create.
    table_id = "test_project.test_dataset.test_Table"
  
    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        skip_leading_rows=1,
        # The source format defaults to CSV, so the line below is optional.
        source_format=bigquery.SourceFormat.CSV,
    )
    uri = "gs://test_bucket/*.csv"

    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config
    )  # Make an API request.

    load_job.result()  # Waits for the job to complete.

    destination_table = client.get_table(table_id)  # Make an API request.
    print(f"Processing file: {file['name']}.")

As there could be multiple uploads so I cannot define the specific table name or file name? Is it possible to do this task automatically?

This function is triggered by PubSub whenever there is a new file in GCS bucket. Thanks

Upvotes: 0

Views: 4659

Answers (4)

Kabilan Mohanraj
Kabilan Mohanraj

Reputation: 1906

To load multiple GCS files onto multiple BQ tables on a single Cloud Function invocation, you’d need to list those files and then iterate over them, creating a load job for each file, just as you have done for one. But doing all that work inside a single function call, kind of breaks the purpose of using Cloud Functions.

If your requirements do not force you to do so, you can leverage the power of Cloud Functions and let a single CF be triggered by each of those files once they are added to the bucket as it is an event driven function. Please refer https://cloud.google.com/functions/docs/writing/background#cloud-storage-example. It would be triggered every time there is a specified activity, for which there would be event metadata.

So, in your application rather than taking the entire bucket contents in the URI, we can take the name of the file which triggered the event and load only that file into a bigquery table as shown in the below code sample.

Here is how you can resolve the issue in your code. Try the following changes in your code.

  • You can extract the details about the event and detail about the file which triggered the event from the cloud function event dictionary. In your case, we can get the file name as event[‘name’] and update the “uri” variable.

  • Generate a new unique table_id (here as an example the table_id is the same as the file name). You can use other schemes to generate unique file names as required.

Refer the code below

 def hello_gcs(event, context):
   from google.cloud import bigquery

   client = bigquery.Client() # Construct a BigQuery client object.

   print(f"Processing file: {event['name']}.") #name of the file which triggers the function
 
   if ".csv" in event['name']:
      # bq job config
       job_config = bigquery.LoadJobConfig(
       autodetect=True,
       skip_leading_rows=1,
       source_format=bigquery.SourceFormat.CSV,
     )

   file_name = event['name'].split('.')  
   table_id = "<project_id>.<dataset_name>."+file_name[0] #[generating new id for each table]

   uri = "gs://<bucket_name>/"+event['name']
   load_job = client.load_table_from_uri(
       uri, table_id, job_config=job_config
   )  # Make an API request.
   load_job.result()  # Waits for the job to complete.
   destination_table = client.get_table(table_id)  # Make an API request.
   print("Table {} uploaded.".format(table_id))

Upvotes: 0

loadbox
loadbox

Reputation: 656

To transfer multiple files from GCS to Bigquery, you can simply loop through all the files. A sample of the working code with comments is below. I believe event and context (function arguments) are handled by Google cloud function by default, so no need to modify that part. Or you can simplify the code by leveraging event instead of a loop.

def hello_gcs(event, context):
    import re
    from google.cloud import storage
    from google.cloud import bigquery
    from google.cloud.exceptions import NotFound

    bq_client = bigquery.Client()
    bucket = storage.Client().bucket("bucket-name")
    for blob in bucket.list_blobs(prefix="folder-name/"):
        if ".csv" in blob.name: #Checking for csv blobs as list_blobs also returns folder_name
           job_config = bigquery.LoadJobConfig(
               autodetect=True,
               skip_leading_rows=1,
               source_format=bigquery.SourceFormat.CSV,
           )
           csv_filename = re.findall(r".*/(.*).csv",blob.name) #Extracting file name for BQ's table id
           bq_table_id = "project-name.dataset-name."+csv_filename[0] # Determining table name
       
           try: #Check if the table already exists and skip uploading it.
               bq_client.get_table(bq_table_id)
               print("Table {} already exists. Not uploaded.".format(bq_table_id))
           except NotFound: #If table is not found, upload it.    
               uri = "gs://bucket-name/"+blob.name
               print(uri)
               load_job = bq_client.load_table_from_uri(
                   uri, bq_table_id, job_config=job_config
               )  # Make an API request.
               load_job.result()  # Waits for the job to complete.
               destination_table = bq_client.get_table(bq_table_id)  # Make an API request.
               print("Table {} uploaded.".format(bq_table_id))

Upvotes: 1

al-dann
al-dann

Reputation: 2725

Correct me if I am wrong, I understand that your cloud function is triggered by a finalize event (Google Cloud Storage Triggers), when a new file (or object) appears in a storage bucket. It means that there is one event for each "new" object in the bucket. Thus, at least one invocation of the cloud function for every object.

The link above has an example of data which comes in the event dictionary. Plenty of information there including details of the object (file) to be loaded.

You might like to have some configuration with mapping between a file name pattern and a target BigQuery table for data loading, for example. Using that map you will be able to make a decision on which table should be used for loading. Or you may have some other mechanism for choosing the target table.

Some other things to think about:

  1. Exception handling - what are you going to do with the file if the data is not loaded (for any reason)? Who and how is to be informed? What is to be done to (correct the source data or the target table and) repeat the loading, etc.
  2. What happens if the loading takes more time, than a cloud function timeout (maximum 540 seconds at the present moment)?
  3. What happens if the there are more than one cloud function invocations from one finalize event, or from different events but from semantically the same source file (repeated data, duplications, etc.)

Don't answer to me, just think about such cases if you have not done it yet.

Upvotes: 1

Adam
Adam

Reputation: 73

if your Data source is GCS and your destination is BQ you can use BigQuery Data Transfer Service to ETL your data in BQ. every Transfer job is for a certain Table and you can select if you want to append or overwrite data in a certain Table with Streaming mode.

You can schedule this job as well. Dialy, weekly,..etc.

Upvotes: 0

Related Questions