Reputation: 327
Sensors in Airflow - are a certain type of operator that will keep running until a certain criterion is met but they consume a full worker slot. Curious if people have been able to reliably use more efficient ways of implementing this.
A few ideas on my mind
Other relevant links:
How to wait for an asynchronous event in a task of a DAG in a workflow implemented using Airflow?
Upvotes: 7
Views: 5892
Reputation: 6420
Cross DAG dependencies are feasible per this doc
Criteria can be specified in a separte DAG as a separate task so that when that criteria is met for a given date, the child task is allowed to run.
Upvotes: 0
Reputation: 1461
The new version of Airflow,namely 1.10.2 provides new option for sensors, which I think addresses your concerns:
mode (str) – How the sensor operates. Options are: { poke | reschedule }, default is poke. When set to poke the sensor is taking up a worker slot for its whole execution time and sleeps between pokes. Use this mode if the expected runtime of the sensor is short or if a short poke interval is requried. When set to reschedule the sensor task frees the worker slot when the criteria is not yet met and it’s rescheduled at a later time. Use this mode if the expected time until the criteria is met is. The poke inteval should be more than one minute to prevent too much load on the scheduler.
Here is the link to doc.
Upvotes: 6
Reputation: 10220
I think you need to step back and question why it's a problem that a sensor consumes a full worker slot.
Airflow is a scheduler, not a resource allocator. Using worker concurrency, pools and queues, you can limit resource usage, but only very crudely. In the end, Airflow naively assumes a sensor will use the same resources on worker nodes as a BashOperator that spawns a multi-process genome sequencing utility. But sensors are cheap and sleep 99.9% of the time, so that is a bad assumption.
So, if you want to solve the problem of sensors consuming all your worker slots, just bump your worker concurrency. You should be able to have hundreds of sensors running concurrently on a single worker.
If you then get problems with very uneven workload distribution on your cluster nodes and nodes with dangerously high system load, you can limit the number of expensive jobs using either:
airflow worker --queues my_expensive_queue
) and have a low concurrency setting. This creates a per-node limit.If you have more complex requirements than that, then consider shipping all non-trivial compute jobs to a dedicated resource allocator, e.g. Apache Mesos, where you can specify the exact CPU, memory and other requirements to make sure your cluster load is distributed more efficiently on each node than Airflow will ever be able to do.
Upvotes: 6