Reputation: 8860
I am trying to use Airflow's Smart Sensors feature on a custom sensor operator (i.e. subclass of BaseSensorOperator
). The documentation on this feature is pretty sparse right now.
The shard jobs (smart_sensor_group_shard_[x]
) are running, but I don't think they are picking up my sensors. Those logs say Loaded 0 sensor_works
.
I think the problem is that BaseSensorOperator.is_smart_sensor_compatible()
is returning False even though I have turned the feature on in my configuration. Here is my configuration:
[smart_sensor]
sensors_enabled = MyCustomSensor
use_smart_sensor = True
But here are logs from MySensorOperator
:
INFO - self.sensor_service_enabled=False
INFO - self.sensors_support_sensor_service={'NamedHivePartitionSensor'}
INFO - Sensor is NOT Smart Sensor compatible
As you can see, the operator still sees Airflow's default settings for those configuration values. I don't know why this inconsistency happens, because I can see the configuration set properly in the UI.
Relevant code from MyCustomSensor:
class MyCustomSensor(BaseSensorOperator):
poke_context_fields = ['some_arg', 'use_smart_sensor']
def __init__(self, some_arg,
use_smart_sensor=False,
*args, **kwargs):
self.some_arg = some_arg
self.use_smart_sensor = use_smart_sensor
super(MyCustomSensor, self).__init__(*args, **kwargs)
def is_smart_sensor_compatible(self):
# If we have turned it off.
if not self.use_smart_sensor:
is_compatible = False
else:
self.soft_fail = False
# super() should be BaseSensorOperator
is_compatible = super().is_smart_sensor_compatible()
log.info(f'{self.sensor_service_enabled=}')
log.info(f'{self.sensors_support_sensor_service=}')
if is_compatible:
log.info('Sensor IS Smart Sensor compatible')
else:
log.info('Sensor is NOT Smart Sensor compatible')
return is_compatible
How I create my sensor task:
# NOTE: I think that poke_interval may
# be ignored when we are using Smart
# Sensors.
my_sensor = MyCustomSensor(
task_id='some_name',
prior_task='some_other_name',
timeout=518400,
mode='reschedule',
poke_interval=30,
use_smart_sensor=True,
dag=dag
)
I am using Cloud Composer, specifically version composer-1.17.1-airflow-2.1.2
. I have verified that these are not blocked configurations for Cloud Composer.
Upvotes: 0
Views: 558
Reputation: 7287
I was able to reproduce your error using your code and when I updated the values of sensors_enabled
and use_smart_sensor
using command gcloud composer environments update
on my existing Cloud Composer instance. It seems that Cloud Composer did not apply the new configuration on run time.
But I was able to find a workaround for this. See steps below:
I created a public issue tracker to report this to the Cloud Composer engineering team.
UPDATE:
Another workaround is to set a dummy environment variable to force the workers to restart and apply the changes on Airflow configs.
gcloud composer environments update <composer_env_name> \
--location <location> \
--update-env-variables=DUMMY=dummy
UPDATE 2:
Beginning with Airflow 2.2, Deferrable Operators are preferred as a solution over Smart Sensors. You should look at that feature first.
Upvotes: 1