Reputation: 431
Given below json:
{ "Model" : "level1" }
what is the right combination of message_filtering_match_values
and message_filtering_config
values? I try below but it fails:
model_operator = SQSSensor(
task_id='model_operator',
dag=dag,
sqs_queue='https://sqs.somewhere/somequeue.fifo',
aws_conn_id='aws_default',
message_filtering='jsonpath',
message_filtering_config='Model[*]',
message_filtering_match_values=['level1'],
mode='reschedule')
Error message is:
Broken DAG: [/usr/local/airflow/dags/test_dag.py] Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 94, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 414, in __init__
"arguments were:\n**kwargs: {k}".format(c=self.__class__.__name__, k=kwargs, t=task_id),
airflow.exceptions.AirflowException: Invalid arguments were passed to SQSSensor (task_id: model_operator). Invalid arguments were:
**kwargs: {'message_filtering': 'jsonpath', 'message_filtering_config': 'Model[*]', 'message_filtering_match_values': ['level1']}
Upvotes: 0
Views: 1072
Reputation: 15979
The message_filtering
/ message_filtering_config
/ message_filtering_match_values
were added recently in PR it was released in Amazon provider version 2.2.0
From the traceback we can see that these parameters are not recognized by the operator which means that you are running an older version of the Amazon provider.
You should upgrade the Amazon provider to the latest version.
pip install apache-airflow-providers-amazon --upgrade
It's also recommended to read the documentation about constraint files.
You didn't mention what Airflow version you are running nor what version of the Amazon provider so note to read the change logs in case you are upgrading major version.
Upvotes: 2