Colin Adams
Colin Adams

Reputation: 90

Setup Cloud Function to trigger Cloud Composer DAG in Terraform

I have a cloud composer environment:

resource "google_composer_environment" "default" {
  name   = "default"
  region = "us-central1"
  config {
    node_count = 5

    node_config {
      zone         = "us-central1-a"
      machine_type = "n1-standard-2"
    }
  }
}

and a cloud function that I use to trigger that environment

resource "google_cloudfunctions_function" "trigger_dag" {
  name    = "trigger_dag"
  runtime = "python37"
  labels = {
    "deployment-tool" = "terraform"
  }

  event_trigger {
    event_type = "google.pubsub.topic.publish"
    resource = "projects/${var.project_id}/topics/trigger_dag"
  }

  entry_point = "trigger_dag"
  environment_variables = {
    "AIRFLOW_URI" = google_composer_environment.default.config.0.airflow_uri
  }

  source_repository {
    url = local.repo_url
  }

  timeouts {}

  depends_on = [google_composer_environment.default]
}

However, I also need to give the cloud function an IAP client id to use when calling the DAG: https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf#getting_the_client_id

Is there a way, in Terraform, to run the python script to generate the client id whenever the DAG is recreated (so the webserver changes and the old id is not valid), and set that id as an environment variable on the cloud function?

Upvotes: 0

Views: 532

Answers (1)

Colin Adams
Colin Adams

Reputation: 90

We got it to work using an external data source to run the python script and return the client id.

data "external" "composer_iap_client_id" {
  program = ["python", "${path.module}/iap_client.py"]

  query = {
    airflow_uri = google_composer_environment.default.config.0.airflow_uri
  }
}

iap_client.py

import json
import sys
import urllib.parse

import requests


def get_iap_client_id(airflow_uri: str) -> str:
    redirect_response = requests.get(airflow_uri, allow_redirects=False)
    redirect_location = redirect_response.headers['location']

    # Extract the client_id query parameter from the redirect.
    parsed = urllib.parse.urlparse(redirect_location)
    query_string = urllib.parse.parse_qs(parsed.query)
    return query_string['client_id'][0]


def main() -> None:
    json_input = json.load(sys.stdin)
    client_id = get_iap_client_id(json_input['airflow_uri'])
    json.dump({'iap_client_id': client_id}, sys.stdout)


if __name__ == '__main__':
    main()

updated the cloud function to pull the iap client id as an environment variable:

  ...
  environment_variables = {
    "AIRFLOW_URI" = google_composer_environment.default.config.0.airflow_uri
    "IAP_CLIENT_ID" = data.external.composer_iap_client_id.result.iap_client_id
  }
  ...

Upvotes: 1

Related Questions