Reputation: 671
I am building a system that is supposed to list files on a remote SFTP server and then download the files locally. I want this to run in parallel such that I can initiate one job for each file to be downloaded, or an upper of say 10 simultaneous downloads.
I am new to Airflow and still not fully understanding everything. I assume there should be a solution to do this but I just can't figure it out.
This is the code, currently I download all files in one Operator, but as far as I know, it is not using multiple workers.
def transfer_files():
for i in range(1, 11):
sftp.get(REMOTE_PATH + 'test_{}.csv'.format(i), LOCAL_PATH + 'test_{}.csv'.format(i))
Upvotes: 0
Views: 1266
Reputation: 2591
Assume you are using PythonOperator, You can start multiple PythonOperators, it would looks something like this:
def get_my_file(i):
sftp.get(REMOTE_PATH + 'test_{}.csv'.format(i), LOCAL_PATH + 'test_{}.csv'.format(i))
def transfer_files():
for i in range(1, 11):
task = PythonOperator(
task_id='test_{}.csv'.format(i),
python_callable=get_my_file,
op_args=[i],
dag=dag)
Upvotes: 1