MrBronson
MrBronson

Reputation: 612

Airflow DockerOperator create Mount objects dynamically

Is there a way to create the Mount objects of DockerOperator dynamically so that I can use the filesystem connection stored in Airflow db? I don't want to change the dag code if the connection changes.

At the moment I need to hardcode the paths like this

            incoming_path = "/incoming/XYZ"                                                                                                                                                                                                                                                                                                                                                       
            output_path = "/output_path/ABC"                                                                                                                                                                                                                                                                                                                                                       
            input_mount = {"source": incoming_path,                                                                                                                                                                                                                                                                                                                                                                                 
                           "target": incoming_path,                                                                                                                                                                                                                                                                                                                                                                                 
                           "type": "bind"}                                                                                                                                                                                                                                                                                                                                                                                                
                                                                                                                                                                                                                                                                                                                                                                                                                                          
            output_mount = {"source": output_path,                                                                                                                                                                                                                                                                                                                                                                                  
                            "target": output_path,                                                                                                                                                                                                                                                                                                                                                                                  
                            "type": "bind"}                                                                                                                                                                                                                                                                                                                                                                                               
                                                                                                                                                                                                                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            
            create_stuff = DockerOperator(                                                                                                                                                                                                                                                                                                                                                                                      
                task_id = f'create_stuff',                                                                                                                                                                                                                                                                                                                                                                                           
                user = 1234,                                                                                                                                                                                                                                                                                                                                                                                                              
                queue = 'default',                                                                                                                                                                                                                                                                                                                                                                                                        
                image = 'image_name',                                                                                                                                                                                                                                                                                                                                                                                       
                api_version='auto',                                                                                                                                                                                                                                                                                                                                                                                                       
                auto_remove=True,                                                                                                                                                                                                                                                                                                                                                                                                         
                mount_tmp_dir=False,                                                                                                                                                                                                                                                                                                                                                                                                      
                mounts=[Mount(**input_mount),                                                                                                                                                                                                                                                                                                                                                                                             
                        Mount(**output_mount)],                                                                                                                                                                                                                                                                                                                                                                                           
                environment={                                                                                                                                                                                                                                                                                                                                                                                                             
                    'AF_EXECUTION_DATE': "{{ ds }}",                                                                                                                                                                                                                                                                                                                                                                                      
                    'AF_OWNER': "{{ task.owner }}",                                                                                                                                                                                                                                                                                                                                                                                       
                },                                                                                                                                                                                                                                                                                                                                                                                                                        
                command = f"do stuff",                                                                                                                                                                                                                                                                                                                                                
                entrypoint='',                                                                                                                                                                                                                                                                                                                                                                                                            
                docker_url='unix://var/run/docker.sock',                                                                                                                                                                                                                                                                                                                                                                                  
                network_mode='bridge'                                                                                                                                                                                                                                                                                                                                                                                                     
            )                                                                                     

I tried to use the FSHook outside an Operator, but then it returns an empty string when I call

with DAG(...) as dag:
...
#THIS WORKS
@task
def task1():
    incoming_hook = FSHook('fs_incoming')
    incoming_path = incoming_hook.get_path()
    ...

#THIS RETURNS AN EMPTY STRING
incoming_hook = FSHook('fs_incoming')
incoming_path = incoming_hook.get_path()

So another phrasing for the question would be is there a way to get the path from the connection outside an Operator?

I'm using Airflow 2.4.1

Upvotes: 0

Views: 433

Answers (1)

RNHTTR
RNHTTR

Reputation: 2525

Given that you just need to share a common string between multiple DAGs, I'd recommend either using an Airflow Variable or using some kind of shared config file in your /dags directory. You can find more detail in this answer.


Forcing the use of a hook here is unnecessary as Airflow Connections are "used for storing credentials and other information necessary for connecting to external services". I don't believe that applies here, but if you did need to use a Connection, you wouldn't need a Hook; you could just use the Connection API to access Connection properties.

Hooks give you additional functionality for actually interacting with external systems. FSHook would allow you to actually interact with a filesystem rather than just share the path value across DAGs.

Upvotes: 1

Related Questions