srinidhi
srinidhi

Reputation: 11

how to launch a cloud dataflow pipeline when particular set of files reaches Cloud storage from a google cloud function

I have a requirement to create a cloud function which should check for a set of files in a GCS bucket and if all of those files arrives in GCS bucket then only it should launch the dataflow templates for all those files.

My existing cloud function code launches cloud dataflow for each file which comes into a GCS bucket. It runs different dataflows for different files based on naming convention. This existing code is working fine but my intention is not to trigger dataflow for each uploaded file directly. It should check for set of files and if all the files arrives, then it should launch dataflows for those files.

Is there a way to do this using Cloud Functions or is there an alternative way of achieving the desired result ?

from googleapiclient.discovery import build
import time
def df_load_function(file, context):
filesnames = [
    'Customer_',
    'Customer_Address',
    'Customer_service_ticket'
    ]

# Check the uploaded file and run related dataflow jobs.
for i in filesnames:
    if 'inbound/{}'.format(i) in file['name']:
        print("Processing file: {filename}".format(filename=file['name']))

        project = 'xxx'
        inputfile = 'gs://xxx/inbound/' + file['name']
        job = 'df_load_wave1_{}'.format(i)
        template = 'gs://xxx/template/df_load_wave1_{}'.format(i)
        location = 'asia-south1'
       
        dataflow = build('dataflow', 'v1b3', cache_discovery=False)
        request = dataflow.projects().locations().templates().launch(
            projectId=project,
            gcsPath=template,
            location=location,
            body={
                'jobName': job,
                "environment": {
                "workerRegion": "asia-south1",
                "tempLocation": "gs://xxx/temp" 
            }
            }
        )

        # Execute the dataflowjob
        response = request.execute()
        
        job_id = response["job"]["id"]

I've written the below code for the above functionality. The cloud function is running without any error but it is not triggering any dataflow. Not sure what is happening as the logs has no error.

from googleapiclient.discovery import build
import time
import os
def df_load_function(file, context):
            filesnames = [
    'Customer_',
    'Customer_Address_',
    'Customer_service_ticket_'
]
paths =['Customer_','Customer_Address_','Customer_service_ticket_']
for path in paths :
if os.path.exists('gs://xxx/inbound/')==True :
    # Check the uploaded file and run related dataflow jobs.
        for i in filesnames:
            if 'inbound/{}'.format(i) in file['name']:
                print("Processing file: {filename}".format(filename=file['name']))

                project = 'xxx'
                inputfile = 'gs://xxx/inbound/' + file['name']
                job = 'df_load_wave1_{}'.format(i)
                template = 'gs://xxx/template/df_load_wave1_{}'.format(i)
                location = 'asia-south1'
       
                dataflow = build('dataflow', 'v1b3', cache_discovery=False)
                request = dataflow.projects().locations().templates().launch(
                projectId=project,
                gcsPath=template,
                location=location,
                body={
                'jobName': job,
                "environment": {
                "workerRegion": "asia-south1",
                "tempLocation": "gs://xxx/temp" 
            }
            }
        )

        # Execute the dataflowjob
                response = request.execute()
        
                job_id = response["job"]["id"]
            
            else:
                exit()

Could someone please help me with the above python code. Also my file names contain current dates at the end as these are incremental files which I get from different source teams.

Upvotes: 0

Views: 614

Answers (1)

Patb
Patb

Reputation: 46

If I'm understanding your question correctly, the easiest thing to do is to write basic logic in your function that determines if the entire set of files is present. If not, exit the function. If yes, run the appropriate Dataflow pipeline. Basically implementing what you wrote in your first paragraph as Python code.

If it's a small set of files it shouldn't be an issue to have a function run on each upload to check set completeness. Even if it's, for example, 10,000 files a month the cost is extremely small for this service assuming:

  1. Your function isn't using lots of bandwidth to transfer data
  2. The code for each function invocation doesn't take a long time to run.

Even in scenarios where you can't meet these requirements Functions is still pretty cheap to run.

If you're worried about costs I would recommend checking out the Google Cloud Pricing Calculator to get an estimate.

Edit with updated code:

I would highly recommend using the Google Cloud Storage Python client library for this. Using os.path likely won't work as there are additional underlying steps required to search a bucket...and probably more technical details there than I fully understand.

To use the Python client library, add google-cloud-storage to your requirements.txt. Then, use something like the following code to check the existence of an object. This example is based off an HTTP trigger, but the gist of the code to check object existence is the same.

from google.cloud import storage
import os

def hello_world(request):
    # Instantiate GCS client
    client = storage.client.Client()

    # Instantiate bucket definition
    bucket = storage.bucket.Bucket(client, name="bucket-name")

    # Search for object
    for file in filenames:
        if storage.blob.Blob(file, bucket) and "name_modifier" in file:
          # Run name_modifier Dataflow job
        elif storage.blob.Blob(file, bucket) and "name_modifier_2" in file:
          # Run name_modifier_2 Dataflow job
        else:
          return "File not found"

This code ins't exactly what you want from a logic standpoint, but should get you started. You'll probably want to just make sure all of the objects can be found first and then move to another step where you start running the corresponding Dataflow jobs for each file if they are all found in the previous step.

Upvotes: 1

Related Questions