Reputation: 21
I have an onpremise CDAP data fusion instance with multiple namespaces. How to trigger the pipeline using airflow operators? I have tried exploring the airflow available operators and this page but not very helpful https://cloud.google.com/data-fusion/docs/reference/cdap-reference#start_a_batch_pipeline
Upvotes: 2
Views: 705
Reputation: 7287
Assuming you already deployed the pipeline and you have the location, instance name and pipeline name of the pipeline you want to run. See CloudDataFusionStartPipelineOperator() for the parameters that it accepts.
Using the quickstart pipeline, I triggered the pipeline using CloudDataFusionStartPipelineOperator()
. See operator usage below:
import airflow
from airflow.providers.google.cloud.operators.datafusion import CloudDataFusionStartPipelineOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with airflow.DAG(
'trigger_df',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
start_pipeline = CloudDataFusionStartPipelineOperator(
location='us-central1',
pipeline_name='DataFusionQuickstart',
instance_name='test',
task_id="start_pipeline",
)
start_pipeline
Success "Graph View":
Logs:
Upvotes: 1