Timothy-Ryan25
Timothy-Ryan25

Reputation: 21

Cloud Functions: How to Insert Data into BigQuery when a file is Uploaded into a GCS Bucket?

I am new to GCP Cloud Functions, and I am trying to add a function to my existing code, which will insert data into BigQuery when a specific file is uploaded into my GCS Bucket.

I am unsure how to write a function to accomplish this task - like, do I need to write a function that will check to see if the file exists first?

So far, I have written the following Cloud Function, which inserts data into BigQuery:

from google.cloud import bigquery

def conn_to_bigquery(request):

client = bigquery.Client()

query = """

"""

query_job = client.query(query) 

Check if file exists:

name = 'file_i_want_to_check.txt'   
storage_client = storage.Client()
bucket_name = 'my_bucket_name'
bucket = storage_client.bucket(bucket_name)
stats = storage.Blob(bucket=bucket, name=name).exists(storage_client)

Upvotes: 1

Views: 2180

Answers (2)

Gopi Kiran
Gopi Kiran

Reputation: 11

Try to use below code , follow indentation as per image added bellow

def hello_gcs(event, context):
"""Triggered by a change to a Cloud Storage bucket.
Args:
     event (dict): Event payload.
     context (google.cloud.functions.Context): Metadata for the event.
"""
#imports
from google.cloud import logging
from google.cloud import bigquery

client = bigquery.Client() 


file = event
print(file)   
print(f"Processing file: {file['name']}.")
print(f"Bucket name: {file['bucket']}.")

file_name = file['name'] #get the filename which triggered cloudfunction
bucket_name = file['bucket'] #get bucketname

table_id = "<your_projectid>.<your_dataset>.<your_tableid>" #Fill your table_id which was created in previous step
uri = "gs://{}/{}".format(bucket_name,file_name) #automatically the uri is generated here by using filename and bucketname.
mapping     q
 
    
#Fill the required configaration details
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.skip_leading_rows=0
job_config.source_format=bigquery.SourceFormat.CSV

destination_table = client.get_table(table_id)
job_config.schema=destination_table.schema

    
load_job = client.load_table_from_uri( uri, table_id, job_config=job_config  )  # Make an API request.

load_job.result()  # Waits for the job to complete.

enter image description here

Upvotes: 0

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

You can specify a Cloud Storage Trigger when you deploy a Cloud Function :

gcloud functions deploy YOUR_FUNCTION_NAME \
--gen2 \
--trigger-event-filters="type=EVENT_TYPE" \
--trigger-event-filters="bucket=YOUR_STORAGE_BUCKET" \
...

This example uses Cloud Function V2. You can use event arc event type with : google.cloud.storage.object.v1.finalized, check the link I shared above.

For the Cloud Function, you can use Cloud Event :

import functions_framework

# Register a CloudEvent function with the Functions Framework
@functions_framework.cloud_event
def conn_to_bigquery(cloud_event):
  # Your code here
  # Access the CloudEvent data payload via cloud_event.data
  
  client = bigquery.Client()

  query = """
  """

  query_job = client.query(query) 

Upvotes: 2

Related Questions