Reputation: 97
Multiple files whose file names containing timestamp in GCS. Have to read all those data from each file and insert those data into BigQuery table along with timestamp as one of the column. How to achieve that in DAG?
Upvotes: 0
Views: 585
Reputation: 6572
You can create a cloud function with a cron job from Cloud Scheduler
:
from google.cloud import bigquery
from google.cloud import storage
bucket_name = 'mybucket'
folder='/projects/bigquery/download/shakespeare/'
delimiter='/'
file = 'shakespeare'
# Retrieve all blobs with a prefix matching the file.
storage_client = storage.Client()
bucket=storage_client.get_bucket(bucket_name)
# List blobs iterate in folder
blobs=bucket.list_blobs(prefix=file, delimiter=delimiter) # Excluding folder inside bucket
for blob in blobs:
print(blob.name)
destination_uri = '{}/{}'.format(folder, blob.name)
element_as_str = blob.download_as_string() # or download_as_string
element_as_dict = json.loads(element_as_str)
# Then parse timestamp from the filename : blob.name
# Use Bigquery Python client to write data containing the timestamp as dict to a Bigquery table
client = bigquery.Client()
client.insert_rows_json(f'{dataset}.{table}', element_as_dict)
You can also do the same logic with a Python Beam/Dataflow cron job from Cloud Scheduler :
PCollection<KV<String, String>> filesAndLines =
p.apply(FileIO.match().filepattern('bucket_path/*.csv'))
.apply(FileIO.read())
.apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
@ProcessElement
public void process(ProcessContext c) {
ReadableFile f = c.element();
String filename = f.getMetadata().resourceId().toString();
String line;
try (BufferedReader r = new BufferedReader(Channels.newInputStream(f.open()))) {
while ((line = r.readLine()) != null) {
c.output(KV.of(filename, line));
}
}
}
}));
You can apply your own logic in the couple file_name => line
and write the result to BigQuery via BigQueryIO
I think the cloud function solution is more lightweight, but I wanted to show this solution anyway.
If you have to parse a big amount of files and your job duration will exceed one hour, the Beam
solution can be interesting because Cloud function V2 is limited to one hour.
Upvotes: 1