tsukikosagi
tsukikosagi

Reputation: 123

Passing a list on a Python Callable in Airflow

I have a dag that uses a list of CSV files then creates them into a data frame for imports.

#CREATING CSV FILES
def csv_filess():
    print("Creating CSV files....")
    csv_files = []
    for file in os.listdir(dataset_dir):
        if file.endswith('.csv'):
            csv_files.append(file)
    print("***Step 3: CSV files created***")
    return csv_files

#CREATING DATAFRAME
def create_df(csv_files):      
    print("Creating dataframe....")     
    df = {}
    for file in csv_files:
        try:
            df[file] = pd.read_csv(data_path+file)
        except UnicodeDecodeError:
            df[file] = pd.read_csv(dataset_dir+file, encoding="ISO-8859-1")
    
    print("***Step 4: CSV files created in df!***")
    return df


t3 = PythonOperator(
    task_id='create_csv',
    python_callable=csv_filess, provide_context=True,
    dag=dag)

t4 = PythonOperator(
    task_id='create_df',
    python_callable=create_df,
    op_args = t3.output,
    provide_context=True,
    dag=dag)

But I get an error:

create_df() takes 1 positional argument but 4 were given

I think it's because I have to put it this way first?:

csv_files = csv_filess()

But how to define that on an Airflow task?

Upvotes: 1

Views: 845

Answers (1)

Bas Harenslak
Bas Harenslak

Reputation: 3094

Returning a value from a PythonOperator automatically stores the output as an XCom with key "return_value". So you'll get an XCom from task create_csv with key return_value and value ["file1.csv", "file2.csv", ...]. You can inspect all XComs in Airflow under Admin -> XComs, or per task by clicking a task -> Instance Details -> XCom.

In your create_df task, you then pass the output of create_csv using t3.output. This is a reference to the previously created XCom. When given a list to op_args, Airflow automatically unpacks the list. So you'll have to accept multiple arguments with a * to do the trick:

def create_df(*csv_files):
    ...

Two notes:

You might be interested in exploring Airflow's TaskFlow API, which would reduce boilerplate code. Your code would look as:

from airflow.decorators import task

with DAG(...) as dag:

    @task
    def csv_filess():
        ...

    @task
    def create_df(csv_files):
        ...

    create_df(csv_filess())

(note that here create_df does not require unpacking.

And lastly note that returned values from PythonOperators are automatically stored as XCom (and are by default stored in the Airflow metastore). Fine if intended/custom XCom backend configured, but I'm a bit wary when it comes to returning Pandas DataFrames as these could potentially be very large.

Upvotes: 1

Related Questions