Alessandro S.
Alessandro S.

Reputation: 1043

Custom sensor in Apache Airflow

Airflow Version : 1.10.0

Following this guide I have created sensors folder and my_sensor.py, I have added the import and use of the

[...]
from sensors.my_sensor import MySensor

[...]
wait_something = MySensor(task_id='taskA')
[...]

Unfortunately Airflow complains when trying to reload the DAG, as follows:

Broken DAG: [/usr/local/airflow/dags/my_dag.py] No module named 'sensors'

I have also restarted the server just to be sure.

The following sentence (from the aforementioned guide) makes me wonder if I failed to update this PATH variable, but I cannot find it anywhere and I have no clue:

When Airflow is running, it will add dags/, plugins/, and config/ to PATH

PS: I don't want to use Plugins, the point of that guide is to provide an alternative way to load custom operators/sensors/hooks without using Plugins

Upvotes: 0

Views: 4460

Answers (3)

paradox
paradox

Reputation: 861

I just had the same problem and it turned out that I hadn't mounted my plugins folder into my container. Once I mounted it all worked.

Upvotes: 0

Rohit ramwal
Rohit ramwal

Reputation: 1

Try something like this :

from airflow.sensors.sftp_sensor import SFTPSensor
    
# where sftp_sensor is my name and sensor is SFTPSensor. 
    
class SFTPSensorPlugin(AirflowPlugin):
    name = "sftp_sensor"
    sensors = [SFTPSensor]

Upvotes: 0

The guide you reference appears to have been incorrect. PATH is the environment variable that lists a set of paths that is searched for executables like the Python executable, grep, or vim. PYTHONPATH is the environment variable that Python uses to add to its module search path. What Airflow is most likely updating is sys.path which is the module search path at runtime, which will include Python's regular module directories as well as the contents of PYTHONPATH. sys.path is particularly useful because it is user-mutable at runtime.

To check what your module search path looks like you could add this to the top of your DAG:

import sys
print(sys.path)

Even in cases of broken DAGs if that section is the first part of the script, then the print output will be included in the webserver/scheduler's DAG parsing log. You can also check the default sys.path without any Airflow changes by running a command like:

# python3 -c 'import sys; print(sys.path)'
['', '/usr/lib/python38.zip', '/usr/lib/python3.8', '/usr/lib/python3.8/lib-dynload', '/home/username/.local/lib/python3.8/site-packages', '/usr/local/lib/python3.8/dist-packages', '/usr/lib/python3/dist-packages']

Now, you may not have permission to write files to system directories like /usr/local/lib/python3.8/dist-packages, or to be able to restart Airflow with a custom PYTHONPATH. But another option is to place your custom sensor in a directory that you do have write access to, and update the sys.path before that path is imported in your DAG. Something like this:

import sys
sys.path.append("/some/arbitrary/path/plugins")

from sensors.my_sensor import MySensor
# ...do something with MySensor

Upvotes: 1

Related Questions