AlexLordThorsen
AlexLordThorsen

Reputation: 8498

401 From MWAA Airflow Environment When Attempting To Run A DAG

Description

When attempting to make calls to a Managed Amazon Airflow instance I was not able to make API calls to the airflow environment despite being able to generate a CLI token from aws mwaa create-cli-token. Why am I getting a FORBIDDEN error here?

Console and Code

Code

@pytest.fixture(scope="function")
def run_dag(
        environment_name: str,
        airflow_conn_info: AirflowConnectionInfo,
        dag_to_run: str,
        target_date: datetime) -> Callable:
    headers = {
        'accept': 'application/json',
        'Content-Type': 'application/json',
        'Authorization': airflow_conn_info.cli_token,

    }
    trigger_body = {
        "conf": {},
        "execution_date": target_date.isoformat(),
    }
    if not dag_to_run:
        dag_to_run = f"{carrier_name}_dag"

    base_url = f"https://{airflow_conn_info.hostname}/api/v1/dags/{dag_to_run}"
    trigger_dag_url = f"{base_url}/dagRuns"
    # TODO: Add some sort of check to ensure a DAG isn't disabled OR add a call
    #       to enable the DAG. For now we're just going to assume it's enabled.
    trigger_response = requests.post(
        trigger_dag_url,
        headers=headers,
        data=json.dumps(trigger_body))
    if trigger_response.status_code != 200:
        raise ValueError(f"{trigger_response}")

    dag_run_status_url = f"{base_url}/{trigger_response.json()['dag_id']}"
    status_body = {}

    task_instances_status_url = f"{base_url}/dagRuns/{trigger_response.json()['dag_id']/taskInstances}"
    task_instances_body = {}

    status_response = requests.get(
        dag_run_status_url,
        headers=headers,
        data=json.dumps(status_body))

    if status_response.status_code != 200:
        raise ValueError(f"{trigger_response}")
    # Derived from
    # https://github.com/apache/airflow/blob/main/airflow/utils/state.py
    terminal_states: List[str] = ["success", "failed"]
    # TODO: Add a timeout to this while loop.
    while (trigger_response.status_code == 200
            and trigger_response.json()["state"] not in terminal_states):
        # TODO: Add some sort of console output to show what task instance we're
        #       on and the state of that current task instance.
        status_response = requests.get(
            dag_run_status_url,
            headers=headers,
            data=json.dumps(status_body))

        task_instances_response = requests.get(
            task_instances_status_url,
            headers=headers,
            data=json.dumps(task_instances_body))
        breakpoint()

Failing Run PDB

(Pdb) base_url
'https://{a_string}.c46.us-east-1.airflow.amazonaws.com/api/v1/dags/fedex_dag'
(Pdb) trigger_response.json()
{'detail': None, 'status': 401, 'title': 'Unauthorized', 'type': 'https://airflow.apache.org/docs/2.0.2/stable-rest-api-ref.html#section/Errors/Unauthenticated'}
(Pdb) headers
{'accept': 'application/json', 'Content-Type': 'application/json', 'Authorization': '{secret}'}

Upvotes: 0

Views: 1443

Answers (1)

AlexLordThorsen
AlexLordThorsen

Reputation: 8498

TLDR: The API is turned off by default. This is not explicitly stated anywhere. You should use the https://YOUR_HOST_NAME/aws_mwaa/cli endpoint unless you want to enable the full API.

When I was reading the AWS documentation around generating a CLI token what was not clear to me is that the aws_mwaa/cli endpoint that AWS adds to the MWAA is the endpoint that they want you to use. This is explained in the user guide but not in any of the website documentation making it very unclear to me.

There is a AmazonMWAAFullApiAccess which sounds like it grants access to the full API if you have access to a role with the policy but i have not tested this yet.

Upvotes: 1

Related Questions