Reputation: 320
I have an Airflow DAG which creates an EMR cluster, then runs SSHOperator tasks on that cluster. Right now, I am hard coding the Master public DNS for the EMR cluster into an Airflow SSH connection. Is there a way for my DAG to dynamically populate this DNS when the EMR cluster is created so I don't have to manually update the connection?
Upvotes: 2
Views: 1109
Reputation: 1401
You can use airflow xcom variables to pass value from one tasks to another tasks. In your usecase you can pass EMR DNS value from EMR creation task to SSH task via XCOM variable.
Pushing data to xcom:
context['ti'].xcom_push(key="xcom_key", value="DNS_NAME")
pulling data from xcom:
context['ti'].xcom_pull(key="xcom_key", task_ids="EMR_Task")
Upvotes: 1
Reputation: 320
After a bit more digging into the Airflow CLI I found it is possible to create/ delete new connections. I've added a bash operator after building the EMR cluster to add an Airflow connection.
airflow connections --delete --conn_id aws_emr
airflow connections --add --conn_id aws_emr --conn_type SSH --conn_host publicDNS --conn_login username --conn_extra {"key_file":"file.pem"}
Upvotes: 2