Reputation: 8498
When attempting to make calls to a Managed Amazon Airflow instance I was not able to make API calls to the airflow environment despite being able to generate a CLI token from aws mwaa create-cli-token
. Why am I getting a FORBIDDEN error here?
@pytest.fixture(scope="function")
def run_dag(
environment_name: str,
airflow_conn_info: AirflowConnectionInfo,
dag_to_run: str,
target_date: datetime) -> Callable:
headers = {
'accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': airflow_conn_info.cli_token,
}
trigger_body = {
"conf": {},
"execution_date": target_date.isoformat(),
}
if not dag_to_run:
dag_to_run = f"{carrier_name}_dag"
base_url = f"https://{airflow_conn_info.hostname}/api/v1/dags/{dag_to_run}"
trigger_dag_url = f"{base_url}/dagRuns"
# TODO: Add some sort of check to ensure a DAG isn't disabled OR add a call
# to enable the DAG. For now we're just going to assume it's enabled.
trigger_response = requests.post(
trigger_dag_url,
headers=headers,
data=json.dumps(trigger_body))
if trigger_response.status_code != 200:
raise ValueError(f"{trigger_response}")
dag_run_status_url = f"{base_url}/{trigger_response.json()['dag_id']}"
status_body = {}
task_instances_status_url = f"{base_url}/dagRuns/{trigger_response.json()['dag_id']/taskInstances}"
task_instances_body = {}
status_response = requests.get(
dag_run_status_url,
headers=headers,
data=json.dumps(status_body))
if status_response.status_code != 200:
raise ValueError(f"{trigger_response}")
# Derived from
# https://github.com/apache/airflow/blob/main/airflow/utils/state.py
terminal_states: List[str] = ["success", "failed"]
# TODO: Add a timeout to this while loop.
while (trigger_response.status_code == 200
and trigger_response.json()["state"] not in terminal_states):
# TODO: Add some sort of console output to show what task instance we're
# on and the state of that current task instance.
status_response = requests.get(
dag_run_status_url,
headers=headers,
data=json.dumps(status_body))
task_instances_response = requests.get(
task_instances_status_url,
headers=headers,
data=json.dumps(task_instances_body))
breakpoint()
(Pdb) base_url
'https://{a_string}.c46.us-east-1.airflow.amazonaws.com/api/v1/dags/fedex_dag'
(Pdb) trigger_response.json()
{'detail': None, 'status': 401, 'title': 'Unauthorized', 'type': 'https://airflow.apache.org/docs/2.0.2/stable-rest-api-ref.html#section/Errors/Unauthenticated'}
(Pdb) headers
{'accept': 'application/json', 'Content-Type': 'application/json', 'Authorization': '{secret}'}
Upvotes: 0
Views: 1443
Reputation: 8498
TLDR: The API is turned off by default. This is not explicitly stated anywhere. You should use the https://YOUR_HOST_NAME/aws_mwaa/cli
endpoint unless you want to enable the full API.
When I was reading the AWS documentation around generating a CLI token what was not clear to me is that the aws_mwaa/cli
endpoint that AWS adds to the MWAA is the endpoint that they want you to use. This is explained in the user guide but not in any of the website documentation making it very unclear to me.
There is a AmazonMWAAFullApiAccess
which sounds like it grants access to the full API if you have access to a role with the policy but i have not tested this yet.
Upvotes: 1