Reputation: 123
I have a dag that uses a list of CSV files then creates them into a data frame for imports.
#CREATING CSV FILES
def csv_filess():
print("Creating CSV files....")
csv_files = []
for file in os.listdir(dataset_dir):
if file.endswith('.csv'):
csv_files.append(file)
print("***Step 3: CSV files created***")
return csv_files
#CREATING DATAFRAME
def create_df(csv_files):
print("Creating dataframe....")
df = {}
for file in csv_files:
try:
df[file] = pd.read_csv(data_path+file)
except UnicodeDecodeError:
df[file] = pd.read_csv(dataset_dir+file, encoding="ISO-8859-1")
print("***Step 4: CSV files created in df!***")
return df
t3 = PythonOperator(
task_id='create_csv',
python_callable=csv_filess, provide_context=True,
dag=dag)
t4 = PythonOperator(
task_id='create_df',
python_callable=create_df,
op_args = t3.output,
provide_context=True,
dag=dag)
But I get an error:
create_df() takes 1 positional argument but 4 were given
I think it's because I have to put it this way first?:
csv_files = csv_filess()
But how to define that on an Airflow task?
Upvotes: 1
Views: 845
Reputation: 3094
Returning a value from a PythonOperator automatically stores the output as an XCom with key "return_value". So you'll get an XCom from task create_csv
with key return_value
and value ["file1.csv", "file2.csv", ...]
. You can inspect all XComs in Airflow under Admin -> XComs, or per task by clicking a task -> Instance Details -> XCom.
In your create_df
task, you then pass the output of create_csv
using t3.output
. This is a reference to the previously created XCom. When given a list to op_args
, Airflow automatically unpacks the list. So you'll have to accept multiple arguments with a *
to do the trick:
def create_df(*csv_files):
...
Two notes:
You might be interested in exploring Airflow's TaskFlow API, which would reduce boilerplate code. Your code would look as:
from airflow.decorators import task
with DAG(...) as dag:
@task
def csv_filess():
...
@task
def create_df(csv_files):
...
create_df(csv_filess())
(note that here create_df
does not require unpacking.
And lastly note that returned values from PythonOperators are automatically stored as XCom (and are by default stored in the Airflow metastore). Fine if intended/custom XCom backend configured, but I'm a bit wary when it comes to returning Pandas DataFrames as these could potentially be very large.
Upvotes: 1