Reputation: 21
I am new to GCP Cloud Functions, and I am trying to add a function to my existing code, which will insert data into BigQuery when a specific file is uploaded into my GCS Bucket.
I am unsure how to write a function to accomplish this task - like, do I need to write a function that will check to see if the file exists first?
So far, I have written the following Cloud Function, which inserts data into BigQuery:
from google.cloud import bigquery
def conn_to_bigquery(request):
client = bigquery.Client()
query = """
"""
query_job = client.query(query)
Check if file exists:
name = 'file_i_want_to_check.txt'
storage_client = storage.Client()
bucket_name = 'my_bucket_name'
bucket = storage_client.bucket(bucket_name)
stats = storage.Blob(bucket=bucket, name=name).exists(storage_client)
Upvotes: 1
Views: 2180
Reputation: 11
Try to use below code , follow indentation as per image added bellow
def hello_gcs(event, context):
"""Triggered by a change to a Cloud Storage bucket.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
#imports
from google.cloud import logging
from google.cloud import bigquery
client = bigquery.Client()
file = event
print(file)
print(f"Processing file: {file['name']}.")
print(f"Bucket name: {file['bucket']}.")
file_name = file['name'] #get the filename which triggered cloudfunction
bucket_name = file['bucket'] #get bucketname
table_id = "<your_projectid>.<your_dataset>.<your_tableid>" #Fill your table_id which was created in previous step
uri = "gs://{}/{}".format(bucket_name,file_name) #automatically the uri is generated here by using filename and bucketname.
mapping q
#Fill the required configaration details
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.skip_leading_rows=0
job_config.source_format=bigquery.SourceFormat.CSV
destination_table = client.get_table(table_id)
job_config.schema=destination_table.schema
load_job = client.load_table_from_uri( uri, table_id, job_config=job_config ) # Make an API request.
load_job.result() # Waits for the job to complete.
Upvotes: 0
Reputation: 6572
You can specify a Cloud Storage Trigger when you deploy a Cloud Function
:
gcloud functions deploy YOUR_FUNCTION_NAME \
--gen2 \
--trigger-event-filters="type=EVENT_TYPE" \
--trigger-event-filters="bucket=YOUR_STORAGE_BUCKET" \
...
This example uses Cloud Function V2
.
You can use event arc event type with : google.cloud.storage.object.v1.finalized
, check the link I shared above.
For the Cloud Function
, you can use Cloud Event :
import functions_framework
# Register a CloudEvent function with the Functions Framework
@functions_framework.cloud_event
def conn_to_bigquery(cloud_event):
# Your code here
# Access the CloudEvent data payload via cloud_event.data
client = bigquery.Client()
query = """
"""
query_job = client.query(query)
Upvotes: 2