Reputation: 740
I ran the following test command:
airflow test events {task_name_redacted} 2018-12-12
...and got the following output:
Dependencies not met for <TaskInstance: events.{redacted} 2018-12-12T00:00:00+00:00 [None]>, dependency 'Task Instance Slots Available' FAILED: The maximum number of running tasks (16) for this task's DAG 'events' has been reached.
[2019-01-17 19:47:48,978] {models.py:1556} WARNING -
--------------------------------------------------------------------------------
FIXME: Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 6. State set to NONE.
--------------------------------------------------------------------------------
[2019-01-17 19:47:48,978] {models.py:1559} INFO - Queuing into pool None
My Airflow is configured with a maximum concurrency of 16. Does this mean that I cannot test a task when the DAG is currently running, and has used all of it's task slots?
Also, it was a little unclear from the docs, but does the airflow test
actually execute the task, as in if it was a SparkSubmitOperator
, it would actually submit the job?
Upvotes: 4
Views: 3358
Reputation: 11607
While I am yet to reach that phase of deployment where concurrency will matter, the docs do give a fairly good indication of problem at hand
Since at any point of time just one scheduler
is running (and you shouldn't be running multiple anyways), indeed it appears that irrespective of whether the DAG
-runs are live-runs or test-runs, this limit will apply on them collectively. So that is certainly a hurdle.
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
But beware that merely increasing this number (assuming you have big-enough boxes for hefty worker
s / multiple worker
s), several other configurations will have to be tweaked as well to achieve the kind of parallelism I sense you want.
They are all listed under [core]
section
# The amount of parallelism as a setting to the executor. This defines the max number of task instances that should run simultaneously on this airflow installation
parallelism = 32
# When not using pools, tasks are run in the "default pool", whose size is guided by this config element
non_pooled_task_slot_count = 128
# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16
But we are still not there, because once you spawn so many tasks simultaneously, the backend metadata-db will start choking. While this is likely a minor problem (and might not be affecting unless you have some real huge DAG
s / very large no of Variable
interactions in your tasks), its still worth noting as a potential roadblock
# The SqlAlchemy pool size is the maximum number of database connections in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 5
# The SqlAlchemy pool recycle is the number of seconds a connection can be idle in the pool before it is invalidated. This config does not apply to sqlite. If the number of DB connections is ever exceeded, a lower config value will allow the system to recover faster.
sql_alchemy_pool_recycle = 1800
# How many seconds to retry re-establishing a DB connection after disconnects. Setting this to 0 disables retries.
sql_alchemy_reconnect_timeout = 300
Needless to say, all this is pretty much futile unless you pick the right executor
; SequentialExecutor
, in particular is only intended for testing
# The executor class that airflow should use. Choices include SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = SequentialExecutor
BaseOperator
like depends_on_past
, wait_for_downstream
are there to spoil the party as wellAirflow
+ Spark
combination: How to submit Spark jobs to EMR cluster from Airflow?(Pardon me if the answer confused you more than you already were, but..)
Upvotes: 4