Reputation: 568
I am new to GCP, I am able to get 1 file into GCS from my VM and then transfer it to bigquery. How to I transfer multiple files from GCS to Bigquery. I know wildcard URi is the solution to it but what other changes are also needed in the code below?
def hello_gcs(event, context):
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
table_id = "test_project.test_dataset.test_Table"
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://test_bucket/*.csv"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print(f"Processing file: {file['name']}.")
As there could be multiple uploads so I cannot define the specific table name or file name? Is it possible to do this task automatically?
This function is triggered by PubSub whenever there is a new file in GCS bucket. Thanks
Upvotes: 0
Views: 4659
Reputation: 1906
To load multiple GCS files onto multiple BQ tables on a single Cloud Function invocation, you’d need to list those files and then iterate over them, creating a load job for each file, just as you have done for one. But doing all that work inside a single function call, kind of breaks the purpose of using Cloud Functions.
If your requirements do not force you to do so, you can leverage the power of Cloud Functions and let a single CF be triggered by each of those files once they are added to the bucket as it is an event driven function. Please refer https://cloud.google.com/functions/docs/writing/background#cloud-storage-example. It would be triggered every time there is a specified activity, for which there would be event metadata.
So, in your application rather than taking the entire bucket contents in the URI, we can take the name of the file which triggered the event and load only that file into a bigquery table as shown in the below code sample.
Here is how you can resolve the issue in your code. Try the following changes in your code.
You can extract the details about the event and detail about the file which triggered the event from the cloud function event dictionary. In your case, we can get the file name as event[‘name’] and update the “uri” variable.
Generate a new unique table_id (here as an example the table_id is the same as the file name). You can use other schemes to generate unique file names as required.
Refer the code below
def hello_gcs(event, context):
from google.cloud import bigquery
client = bigquery.Client() # Construct a BigQuery client object.
print(f"Processing file: {event['name']}.") #name of the file which triggers the function
if ".csv" in event['name']:
# bq job config
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
)
file_name = event['name'].split('.')
table_id = "<project_id>.<dataset_name>."+file_name[0] #[generating new id for each table]
uri = "gs://<bucket_name>/"+event['name']
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print("Table {} uploaded.".format(table_id))
Upvotes: 0
Reputation: 656
To transfer multiple files from GCS to Bigquery, you can simply loop through all the files. A sample of the working code with comments is below.
I believe event
and context
(function arguments) are handled by Google cloud function by default, so no need to modify that part. Or you can simplify the code by leveraging event
instead of a loop.
def hello_gcs(event, context):
import re
from google.cloud import storage
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
bq_client = bigquery.Client()
bucket = storage.Client().bucket("bucket-name")
for blob in bucket.list_blobs(prefix="folder-name/"):
if ".csv" in blob.name: #Checking for csv blobs as list_blobs also returns folder_name
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
)
csv_filename = re.findall(r".*/(.*).csv",blob.name) #Extracting file name for BQ's table id
bq_table_id = "project-name.dataset-name."+csv_filename[0] # Determining table name
try: #Check if the table already exists and skip uploading it.
bq_client.get_table(bq_table_id)
print("Table {} already exists. Not uploaded.".format(bq_table_id))
except NotFound: #If table is not found, upload it.
uri = "gs://bucket-name/"+blob.name
print(uri)
load_job = bq_client.load_table_from_uri(
uri, bq_table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = bq_client.get_table(bq_table_id) # Make an API request.
print("Table {} uploaded.".format(bq_table_id))
Upvotes: 1
Reputation: 2725
Correct me if I am wrong, I understand that your cloud function is triggered by a finalize
event (Google Cloud Storage Triggers), when a new file (or object) appears in a storage bucket. It means that there is one event for each "new" object in the bucket. Thus, at least one invocation of the cloud function for every object.
The link above has an example of data which comes in the event
dictionary. Plenty of information there including details of the object (file) to be loaded.
You might like to have some configuration with mapping between a file name pattern and a target BigQuery table for data loading, for example. Using that map you will be able to make a decision on which table should be used for loading. Or you may have some other mechanism for choosing the target table.
Some other things to think about:
finalize
event, or from different events but
from semantically the same source file (repeated data, duplications,
etc.)Don't answer to me, just think about such cases if you have not done it yet.
Upvotes: 1
Reputation: 73
if your Data source is GCS and your destination is BQ you can use BigQuery Data Transfer Service to ETL your data in BQ. every Transfer job is for a certain Table and you can select if you want to append or overwrite data in a certain Table with Streaming mode.
You can schedule this job as well. Dialy, weekly,..etc.
Upvotes: 0