Anonymous
Anonymous

Reputation: 21

How to trigger a CDAP pipeline using airflow operators?

I have an onpremise CDAP data fusion instance with multiple namespaces. How to trigger the pipeline using airflow operators? I have tried exploring the airflow available operators and this page but not very helpful https://cloud.google.com/data-fusion/docs/reference/cdap-reference#start_a_batch_pipeline

Upvotes: 2

Views: 705

Answers (1)

Ricco D
Ricco D

Reputation: 7287

Assuming you already deployed the pipeline and you have the location, instance name and pipeline name of the pipeline you want to run. See CloudDataFusionStartPipelineOperator() for the parameters that it accepts.

Using the quickstart pipeline, I triggered the pipeline using CloudDataFusionStartPipelineOperator(). See operator usage below:

import airflow
from airflow.providers.google.cloud.operators.datafusion import CloudDataFusionStartPipelineOperator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'trigger_df',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    start_pipeline = CloudDataFusionStartPipelineOperator(
        location='us-central1',
        pipeline_name='DataFusionQuickstart',
        instance_name='test',
        task_id="start_pipeline",
    )

    start_pipeline

Success "Graph View":

enter image description here

Logs:

enter image description here

Upvotes: 1

Related Questions