alital
alital

Reputation: 431

Airflow SQSSensor message filtering

Given below json:

{ "Model" : "level1" }

what is the right combination of message_filtering_match_values and message_filtering_config values? I try below but it fails:

model_operator = SQSSensor(
  task_id='model_operator',
  dag=dag,
  sqs_queue='https://sqs.somewhere/somequeue.fifo',
  aws_conn_id='aws_default',
  message_filtering='jsonpath',
  message_filtering_config='Model[*]',
  message_filtering_match_values=['level1'],
  mode='reschedule')

Error message is:

Broken DAG: [/usr/local/airflow/dags/test_dag.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 94, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 414, in __init__
    "arguments were:\n**kwargs: {k}".format(c=self.__class__.__name__, k=kwargs, t=task_id),
airflow.exceptions.AirflowException: Invalid arguments were passed to SQSSensor (task_id: model_operator). Invalid arguments were:
**kwargs: {'message_filtering': 'jsonpath', 'message_filtering_config': 'Model[*]', 'message_filtering_match_values': ['level1']}

Upvotes: 0

Views: 1072

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15979

The message_filtering / message_filtering_config / message_filtering_match_values were added recently in PR it was released in Amazon provider version 2.2.0 From the traceback we can see that these parameters are not recognized by the operator which means that you are running an older version of the Amazon provider.

You should upgrade the Amazon provider to the latest version.

pip install apache-airflow-providers-amazon --upgrade

It's also recommended to read the documentation about constraint files.

You didn't mention what Airflow version you are running nor what version of the Amazon provider so note to read the change logs in case you are upgrading major version.

Upvotes: 2

Related Questions