Frank
Frank

Reputation: 671

Run multiple iterations of one Airflow Operator

I am building a system that is supposed to list files on a remote SFTP server and then download the files locally. I want this to run in parallel such that I can initiate one job for each file to be downloaded, or an upper of say 10 simultaneous downloads.

I am new to Airflow and still not fully understanding everything. I assume there should be a solution to do this but I just can't figure it out.

This is the code, currently I download all files in one Operator, but as far as I know, it is not using multiple workers.

def transfer_files():
    for i in range(1, 11): 
        sftp.get(REMOTE_PATH + 'test_{}.csv'.format(i), LOCAL_PATH + 'test_{}.csv'.format(i))

Upvotes: 0

Views: 1266

Answers (1)

Chengzhi
Chengzhi

Reputation: 2591

Assume you are using PythonOperator, You can start multiple PythonOperators, it would looks something like this:

def get_my_file(i):
    sftp.get(REMOTE_PATH + 'test_{}.csv'.format(i), LOCAL_PATH + 'test_{}.csv'.format(i))

def transfer_files():
    for i in range(1, 11):
        task = PythonOperator(
            task_id='test_{}.csv'.format(i),
            python_callable=get_my_file,
            op_args=[i],
            dag=dag)

Upvotes: 1

Related Questions