Stephen
Stephen

Reputation: 8860

Cannot turn on Airflow's Smart Sensors feature (custom sensor)

I am trying to use Airflow's Smart Sensors feature on a custom sensor operator (i.e. subclass of BaseSensorOperator). The documentation on this feature is pretty sparse right now.

The shard jobs (smart_sensor_group_shard_[x]) are running, but I don't think they are picking up my sensors. Those logs say Loaded 0 sensor_works.

I think the problem is that BaseSensorOperator.is_smart_sensor_compatible() is returning False even though I have turned the feature on in my configuration. Here is my configuration:

[smart_sensor]
sensors_enabled = MyCustomSensor
use_smart_sensor = True

But here are logs from MySensorOperator:

INFO - self.sensor_service_enabled=False
INFO - self.sensors_support_sensor_service={'NamedHivePartitionSensor'}
INFO - Sensor is NOT Smart Sensor compatible

As you can see, the operator still sees Airflow's default settings for those configuration values. I don't know why this inconsistency happens, because I can see the configuration set properly in the UI.

The rest of my code

Relevant code from MyCustomSensor:

class MyCustomSensor(BaseSensorOperator):
    poke_context_fields = ['some_arg', 'use_smart_sensor']

    def __init__(self, some_arg,
                 use_smart_sensor=False,
                 *args, **kwargs):
        self.some_arg = some_arg
        self.use_smart_sensor = use_smart_sensor
        super(MyCustomSensor, self).__init__(*args, **kwargs)
    
    def is_smart_sensor_compatible(self):

        # If we have turned it off.
        if not self.use_smart_sensor:
            is_compatible = False
        else:
            self.soft_fail = False

            # super() should be BaseSensorOperator
            is_compatible = super().is_smart_sensor_compatible()

        log.info(f'{self.sensor_service_enabled=}')
        log.info(f'{self.sensors_support_sensor_service=}')

        if is_compatible:
            log.info('Sensor IS Smart Sensor compatible')
        else:
            log.info('Sensor is NOT Smart Sensor compatible')
        return is_compatible

How I create my sensor task:

# NOTE: I think that poke_interval may
#   be ignored when we are using Smart 
#   Sensors.
my_sensor = MyCustomSensor(
    task_id='some_name',
    prior_task='some_other_name',
    timeout=518400,
    mode='reschedule',
    poke_interval=30,
    use_smart_sensor=True,
    dag=dag
)

I am using Cloud Composer, specifically version composer-1.17.1-airflow-2.1.2. I have verified that these are not blocked configurations for Cloud Composer.

Upvotes: 0

Views: 558

Answers (1)

Ricco D
Ricco D

Reputation: 7287

I was able to reproduce your error using your code and when I updated the values of sensors_enabled and use_smart_sensor using command gcloud composer environments update on my existing Cloud Composer instance. It seems that Cloud Composer did not apply the new configuration on run time.

But I was able to find a workaround for this. See steps below:

  1. I created a new composer instance and at the creation page I already defined Airflow config overrides.

enter image description here

  1. Using your DAG, I ran a test and the Airflow configuration were applied correctly.

enter image description here

I created a public issue tracker to report this to the Cloud Composer engineering team.

UPDATE:

Another workaround is to set a dummy environment variable to force the workers to restart and apply the changes on Airflow configs.

gcloud composer environments update <composer_env_name> \
--location <location> \
--update-env-variables=DUMMY=dummy

UPDATE 2:

Beginning with Airflow 2.2, Deferrable Operators are preferred as a solution over Smart Sensors. You should look at that feature first.

Upvotes: 1

Related Questions