LearnerJS
LearnerJS

Reputation: 299

how to invoke the same cloud run from a cloud run to run requests parallely?

I am running an ETL process using cloud run.

I have 2000 files. Only 1200 files are getting preprocessed and loaded in BIG Query. Because the cloud run is getting timed out. So, I thought of dividing the load.

I am dividing 2000 files into set of 4 as 500 each and authenticating and using requests.post to call the same cloud run. However it executes one set after another with the same instance of cloud run. And it again times out

How can I make it run parallely?

As of now, max instances: 20. Concurrency: 1, CPU:2, Memory: 8GB.

Upvotes: 1

Views: 909

Answers (1)

TSnake
TSnake

Reputation: 480

Well, I have worked on something like this. I am not sure if it'd help you since you haven't shared a single block of code. Here's a sample code of 2k JSON files I'd download.

You have 2000 files and 1200 of this is getting processed/loaded in GBQ before the cloud run timing out. What you can do is:

    total_files = len(file_list)//1000   #let file list be 2000, total files will be 2.

    #divide the files into sets of 1000 and loop over them one by one
    
    for file_set in range(1,(total_files+1)):
        auth_and_trigger(file_list[(file_set-1)*1000:(file_set*1000)])

    #for files left after 1000*i , we finally trigger it.
    auth_and_trigger(file_list[(total_files)*1000:len(file_list)])

Now this is how you can call the cloud run with auth and trigger function for every 1000 files.

    def auth_and_trigger(self, rest_of_files):
    #your cloud run url
    receiving_service_url = 'https://cloudrun-url-uc.a.run.app/download'

    # Set up metadata server request
    # See https://cloud.google.com/compute/docs/instances/verifying-instance-identity#request_signature
    metadata_server_token_url = 'http://metadata/computeMetadata/v1/instance/service-accounts/default/identity?audience='

    token_request_url = metadata_server_token_url + receiving_service_url
    token_request_headers = {'Metadata-Flavor': 'Google'}

    # Fetch the token
    token_response = requests.get(token_request_url, headers=token_request_headers)
    jwt = token_response.content.decode("utf-8")

    # Provide the token in the request to the receiving service
    receiving_service_headers = {'Authorization': f'bearer {jwt}'}

    try:
        threading.Thread(target=self.trigger_ingest,
                         args=(receiving_service_url,
                               {"files": rest_of_files},
                               receiving_service_headers
                               )).start()
    except Exception as error:
        logging.error(error)

Each thread will call a function trigger_ingest that will call the cloud run. The code for it is below:

    def trigger_ingest(url, json, headers=""):
    service_response = requests.post(url=url,
                                     json=json,
                                     headers=headers
                                     )
    logging.info(service_response.content)

Now since you want a parallel execution, make sure no code gets repeated in the thread as you have it in the trigger for the cloud run.

Upvotes: 2

Related Questions