Reputation: 8273
I had implemented test case for running an individual dag but it does not seem to work in 1.9 and may be due to stricter pool which got introduced in airflow 1.8 . I am trying to run below test case:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
class DAGTest(unittest.TestCase):
def make_tasks(self):
dag = DAG('test_dag', description='a test',
schedule_interval='@once',
start_date=datetime(2018, 6, 26),
catchup=False)
du1 = DummyOperator(task_id='dummy1', dag=dag)
du2 = DummyOperator(task_id='dummy2', dag=dag)
du3 = DummyOperator(task_id='dummy3', dag=dag)
du1 >> du2 >> du3
dag.run()
def test_execute(self):
self.make_tasks()
Dependencies not met for <TaskInstance: test_dag.dummy3 2018-06-26 00:00:00 [upstream_failed]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all
upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'skipped': 0L, 'successes': 0L, 'failed': 0L,'upstream_failed': 1L, 'done': 1L, 'total': 1}, upstream_task_ids=['dummy2']
What am I doing it wrong? I have tried both LocalExecutor and SequentialExecutor
Environment:
Python 2.7
Airflow 1.9
I believe it is trying to execute all the tasks simultaneously without respecting the dependencies. Note: Similar code use to work in Airflow 1.7
Upvotes: 3
Views: 4076
Reputation: 5923
Here's a function you can use in a pytest
test case that will run the tasks of your DAG in order.
from datetime import timedelta
import pytest
from unittest import TestCase
@pytest.fixture
def test_dag(dag):
dag._schedule_interval = timedelta(days=1) # override cuz @once gets skipped
done = set([])
def run(key):
task = dag.task_dict[key]
for k in task._upstream_task_ids:
run(k)
if key not in done:
print(f'running task {key}...')
date = dag.default_args['start_date']
task.run(date, date, ignore_ti_state=True)
done.add(key)
for k, _ in dag.task_dict.items():
run(k)
You can then use test_dag(dag) instead of dag.run()
in your test.
You'll need to make sure your logging in your custom operators uses self.log.info()
rather than logging.info()
or print()
, or they won't show up.
You may also need to run your test using python -m pytest -s test_my_dag.py
, as without the -s
flag Airflow's stdout will not be captured.
I'm still trying to figure out how to handle inter-DAG dependencies.
Upvotes: 0
Reputation: 4058
I'm not familar with Airflow 1.7, but I guess it didn't have the same "DagBag" concept that Airflow1.8 and upwards have.
You can't run a DAG that you have created like this, because dag.run()
starts a new python process and it will have to find the DAG from a dag folder it parses on disk - but it can't. This was included as a message in the output (but you didn't include the full error message/output)
What are you trying to test by creating a dag in the test files? Is it a custom operator? Then you would be better off testing that directly. For instance, here is how I test a custom operator stand-alone:
class MyPluginTest(unittest.TestCase)
def setUp(self):
dag = DAG(TEST_DAG_ID, schedule_interval='* * * * Thu', default_args={'start_date': DEFAULT_DATE})
self.dag = dag
self.op = myplugin.FindTriggerFileForExecutionPeriod(
dag=dag,
task_id='test',
prefix='s3://bucket/some/prefix',
)
self.ti = TaskInstance(task=self.op, execution_date=DEFAULT_DATE)
# Other S3 setup here, specific to my test
def test_execute_no_trigger(self):
with self.assertRaises(RuntimeError):
self.ti.run(ignore_ti_state=True)
# It shouldn't have anything in XCom
self.assertEqual(
self.ti.xcom_pull(task_ids=self.op.task_id),
None
)
Upvotes: 1