Reputation: 129
I installed Airflow using docker-compose and ran the db init command. I am trying to create a DAG that uses the DockerOperator to execute some script. In my DockerOperator, the script is trying to read Airflow Variables and get Connections using the BaseHook. But it seems that the script in my DockerOperator is connecting to the (empty) sqlalchemy database and not my initialized postgres database that has the populated Connections and Variables set through the UI.
Is there a way to give the DockerOperator context to the Airflow Database containing the Connections and Variables set through the UI?
My Airflow DAG
from datetime import datetime
from json import dumps
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.models import XCom
from airflow.models.param import Param
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.db import provide_session
local_tz = pendulum.timezone("America/Los_Angeles") # will affect schedule interval time zone
args = {"owner": "Airflow"}
SYNC_SCRIPT_DAG = "sync_script_dag_v1"
@provide_session
def cleanup_xcom(session=None, **context):
print("Cleaning up!!!")
dag = context["dag"]
dag_id = dag._dag_id
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
#################### Define Tasks #####################
## Change Python Executor
with DAG(
dag_id=SYNC_SCRIPT_DAG,
default_args=args,
catchup=False,
start_date=datetime(2020, 7, 8, tzinfo=local_tz),
max_active_runs=1,
tags=["production"],
schedule_interval=None,
params={
# these env vars are expected by the script
"SOURCE_ONE": "source_two_credential", # credential ID in Airflow
"LOG_LEVEL": "INFO",
"NUM_WORKER_THREADS": "10",
},
on_failure_callback=cleanup_xcom,
on_success_callback=cleanup_xcom,
) as dag:
@task(task_id="get_airflow_params_task")
def get_airflow_params(**context):
airflow_params = context.get("params")
return dumps(airflow_params)
# mask_secret_task = mask_secret()
get_params_task = get_airflow_params()
new_task = DockerOperator(
task_id="my_task_id",
image="sync-script:latest",
api_version="auto",
auto_remove=True,
# SOURCE_ONE_JIRA_USER should evaluate to the connection dict for Airflow source_two_credential ID
command="""main.py
--SOURCE_ONE_JIRA_USER '{{ conn[params.SOURCE_TWO] }}'
--airflow_params '{{ ti.xcom_pull(task_ids='get_airflow_params_task')}}'
""",
environment={
"YML_CONFIG": "yml_config",
},
docker_url="unix://var/run/docker.sock",
network_mode="host",
force_pull=True,
docker_conn_id="harbor_credentials",
)
get_params_task >> new_task
Now knowing that there is no way to connect to the Airflow Variables or connections from the DockerOperator, I removed the original Error I was getting.
What I was hoping for is a way to pass in via Airflow params, a connection_id and later pass that connection dictionary to my Python script using the DockerOperator. This allows different users triggering the dag to pass in their own credential_id instead of having to overwrite it.
Upvotes: 1
Views: 1567
Reputation: 15931
The code you are executing within DockerOperator
is in a "closed environment" it can not access the Airflow resources. Should you need them then you must pass them when initializing DockerOperator
.
The command
parameter is templated field so simply use Jinja to achieve that:
p1_auth_task = DockerOperator(
task_id="auth_v1",
image="tako:latest",
api_version="auto",
auto_remove=True,
command="auth.py --airflowmode {{ var.value.MY_INITIALIZED_VAR }}",
docker_url="unix://var/run/docker.sock",
network_mode="bridge",
mount_tmp_dir=False
)
You can also pass with environment
if you prefer.
Edit: Since you are after connection then simply use the connection macro as explained in use Airflow connection from a jinja template
In your case:
epic_task = DockerOperator(
task_id="epic_task",
image="sync-script:latest",
api_version="auto",
auto_remove=True,
command="main.py",
environment={
"YML_CONFIG": "configs/epic_config.yaml",
"AIRFLOW_PARAMS_SOURCE_ONE_HOST": "{{ conn.SOURCE_ONE.host }}",
"AIRFLOW_PARAMS_SOURCE_TWO_HOST": "{{ conn.SOURCE_TWO.host }}",
},
docker_url="unix://var/run/docker.sock",
network_mode="host",
force_pull=True
)
Upvotes: 2