Reputation: 43
Can I trigger an airflow task from cloud function?
Basically my problem is this. I have some file which arrives in google cloud storage. Multiple files in the same DAG. I need to trigger a transformation job when the file arrives. I was thinking to use cloud function. But there are a lot of dependent jobs in my DAG.
Any help is appreciated
Upvotes: 2
Views: 9408
Reputation: 29
I believe there is difference between Cloud function and GoogleCloudStoragePrefixSensor. Cloud function triggers whenever any event occurs such as File arrival. that way there is guarantee that only the latest file will be picked up. In contrast, GoogleCloudStoragePrefixSensor, it runs at predefined time (as part of DAG) and picks up EXISTING file, matching prefi, in given location. e.g. in daily job, today file arrived and it was processed. If that file continues to lie there, same file may be picked up by this sensor. So you need to be careful in its application. Ofcourse you can put addon intelligence to pick up latest file and archiving file once processed and all but that has to be added.
Upvotes: 0
Reputation: 512
You necessarily don't need Cloud Function to sense for the file in GCS, Composer has GCS sensors which can be used to fulfill the purpose.
Suppose you have to monitor files in bucket/folder/file_*.csv then :
from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
import datetime as dt
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
lasthour = dt.datetime.now() - dt.timedelta(hours=1)
args = {
'owner': 'airflow',
'start_date': lasthour,
'depends_on_past': False,
}
dag = DAG(
dag_id='GCS_sensor_dag',
schedule_interval=None,
default_args=args
)
GCS_File_list = GoogleCloudStorageListOperator(
task_id= 'list_Files',
bucket= 'bucketname',
prefix='folder/file_',
delimiter='.csv',
google_cloud_storage_conn_id='google_cloud_default',
dag = dag
)
file_sensor = GoogleCloudStoragePrefixSensor(
task_id='gcs_polling',
bucket='bucketname',
prefix='folder/file_',
dag=dag
)
trigger = TriggerDagRunOperator(
task_id='trigger_dag_{timestamp}_rerun'.format(timestamp=((dt.datetime.now() - dt.datetime.utcfromtimestamp(0)).total_seconds()*1000)),
trigger_dag_id="GCS_sensor_dag",
dag=dag
)
file_sensor >> GCS_File_list >> trigger
Upvotes: 6
Reputation: 3883
You can trigger DAGs in response to a change in a Cloud Storage bucket. To accomplish this, Cloud Composer DAGs can be triggered by Cloud Functions. There is already great official documentation and Codelabs, which describe the workflow. It will work as following:
Remember about one thing. When you will be at the Creating your function
step. You need to fill that line: const WEBSERVER_ID = 'your-tenant-project-id';
.
To retrieve that variable, go to Airflow UI, than Admin -> Configuration
, and search for base_url
key, which is your webserver-id
(without https://
and .appspot.com
parts).
Another way to do so, is using the following command:
gcloud composer environments describe <ENVIRONMENT_NAME> --location <LOCATION>
And you will be able to see config: -> airflowUri
variable.
I've tried that scenario once and it works pretty fine. Feel free to ask more questions. I hope you find the above pieces of information useful.
Upvotes: 2