smullan
smullan

Reputation: 152

Airflow not failing on subtask error

Perhaps someone can tell me what I'm doing wrong here. I have a task in Airflow that runs a command, and in the logs I get this error:

[2018-05-30 11:22:43,814] {models.py:1428} INFO - Executing <Task(PythonOperator): computer_unload_and_load> on 2018-05-30 15:22:41.595535
[2018-05-30 11:22:43,814] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'airflow run copy_kiosk_status computer_unload_and_load 2018-05-30T15:22:41.595535 --job_id 23 --raw -sd DAGS_FOLDER/copy_poc.py']
[2018-05-30 11:22:44,367] {base_task_runner.py:98} INFO - Subtask: [2018-05-30 11:22:44,367] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-05-30 11:22:44,412] {base_task_runner.py:98} INFO - Subtask: [2018-05-30 11:22:44,412] {models.py:189} INFO - Filling up the DagBag from /other/airflow/dags/copy_kiosk.py
[2018-05-30 11:22:44,570] {cli.py:374} INFO - Running on host [redacted]
[2018-05-30 11:22:46,967] {base_task_runner.py:98} INFO - Subtask: ERROR:  schema "dw2" does not exist
[2018-05-30 11:22:46,969] {base_task_runner.py:98} INFO - Subtask: [2018-05-30 11:22:46,968] {python_operator.py:90} INFO - Done. Returned value was: None

Here is the task:

computer_load_task = PythonOperator(
    task_id='computer_unload_and_load',
    python_callable=unload_and_load.unload_and_load_table,
    op_args=(source_host, source_db, "public", "computer", "computer_unload_task")
    dag=dag
)

Here is the function it is calling:

def load_table(host, db, schema, table, unload_task_id=False, file_path=False, **kwargs):
    """ load a csv file into a table
        if no file_path is given, uses XCOM to get the file_name returned by the unload task"""
    try:
        if not file_path:
            file_path = kwargs['ti'].xcom_pull(task_ids=unload_task_id)

        load_cmd = "\copy {}.{} FROM {} WITH (FORMAT CSV,  NULL '^',  HEADER)".format(schema, table, file_path)
        command = [ "psql",
                    "-U", "root",
                    "-h", host,
                    "-d", db,
                    "-c", load_cmd
                  ]
        subprocess.run(command)

    except:
        raise

Obviously I know how to fix the error (I have the wrong schema in there), but I want to know why the tasks succeeds in Airflow rather than failing? I must be missing something obvious.

Upvotes: 2

Views: 727

Answers (1)

tobi6
tobi6

Reputation: 8239

Your code example looks fine - one thing, though. In this code example provide_context=True is missing from your task.

Besides that, I think this is connected to the subprocess handling errors. You could try setting the attribute check=True so that subprocess will throw an exception if something goes wrong.

Alternatively you could check on the subprocess check_returncode() and throw an exception of your own if it is non-zero.

Here are the Python 3 docs for subprocess: https://docs.python.org/3/library/subprocess.html

To separately try exception handling it might be a good idea to create a task and just raise an exception if that doesn't work.

Upvotes: 2

Related Questions