Berni Hacker
Berni Hacker

Reputation: 87

Apache Ni-Fi pipeline running on AWS EC2 not starting when triggered from Apache Airflow in another EC2 instance

I have created a DAG file to schedule python and Apache Ni-Fi tasks.

In order to control the Ni-Fi pipeline, I have followed the suggestions contained in the following page: https://towardsdatascience.com/interconnecting-airflow-with-a-nifi-etl-pipeline-8abea0667b8a

I have added a GenerateFlowFile processor to my Ni-Fi pipeline to let Airflow trigger Ni-Fi and an UpdateAttribute processor to let Airlow know when the pipeline is over.

Apache Airflow runs from an AWS EC2 instance and Apache Ni-Fi runs in another AWS EC2 instance.

I have created rules in AWS security group to let each of the two EC2 instances access the other one. I have tried by using both private and public IP addresses of the machines as well as the machines' security groups:

Security group of machine 1:

  1. IPv4 TCP 8443 <public IP address of machine 2>/32
  2. IPv4 TCP 8443 <private IP address of machine 2>/32
  3. IPv4 TCP 8443 <security group of machine 2>

Security group of machine 2:

  1. IPv4 TCP 8443 <public IP address of machine 1>/32
  2. IPv4 TCP 8443 <private IP address of machine 1>/32
  3. IPv4 TCP 8443 <security group of machine 1>

Problem: My DAG works with the python script but not with the Ni-Fi pipeline. The pipeline works fine if started from Ni-Fi.

Here is an extract of the DAG file (scroll down):

# Importing the needed modules
import requests
import json
from time import time
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# Functions to be used in the Ni-FI tasks


def get_processor(url_nifi_api: str, processor_id: str, token=None):
    """
    This function gets and returns a single processor.
    It makes use of the REST API `/processors/{processor_id}`.
    :param url_nifi_api: String
    :param processor_id: String
    :param token: JWT access token
    :returns: JSON object processor
    """

    # Authorization header
    header = {
        "Content-Type": "application/json",
        "Authorization": "Bearer {}".format(token),
    }

    # GET processor and parse to JSON
    response = requests.get(url_nifi_api + f"processors/{processor_id}", headers=header)
    return json.loads(response.content)


def get_processor_state(url_nifi_api: str, processor_id: str, token=None):
    """
    This function gets and returns a single processor state.
    It makes use of the REST API 'processors/{processor_id}/state'.
    :param url_nifi_api: String
    :param processor_id: String
    :param token: JWT access token
    :returns: JSON object processor's state
    """

    # Authorization header
    if token is None:
        header = {"Content-Type": "application/json"}
    else:
        header = {
            "Content-Type": "application/json",
            "Authorization": "Bearer {}".format(token),
        }

    # GET processor and parse to JSON
    response = requests.get(
        url_nifi_api + f"processors/{processor_id}/state", headers=header
    )
    return json.loads(response.content)


def get_token(url_nifi_api: str, access_payload: dict):
    """
    This function retrieves a JWT token by authenticating the user.
    It makes use of the REST API `/access/token`.
    :param url_nifi_api: the basic URL to the NiFi API.
    :param access_payload: dictionary with keys 'username' & 'password' and
                           fitting values.
    :return: JWT Token
    """

    # Authorization header
    header = {
        "Accept-Encoding": "gzip, deflate, br",
        "Content-Type": "application/x-www-form-urlencoded",
        "Accept": "*/*",
    }

    response = requests.post(
        url_nifi_api + "access/token", headers=header, data=access_payload
    )
    return response.content.decode("ascii")


def update_processor_status(processor_id: str, new_state: str, token, url_nifi_api):
    """
    This function starts or stops a processor by retrieving the processor to get
    the current revision and finally put a JSON with the desired
    state towards the API.
    It needs the function get_processor
    :param processor_id: Id of the processor to receive the new state.
    :param new_state: String representing the new state,
                      acceptable values are: STOPPED or RUNNING.
    :param token: a JWT access token for NiFi.
    :param url_nifi_api: URL to the NiFi API
    :return: None
    """

    # Retrieve processor from `/processors/{processor_id}`
    processor = get_processor(url_nifi_api, processor_id, token)

    # Create a JSON with the new state and the processor's revision
    put_dict = {
        "revision": processor["revision"],
        "state": new_state,
        "disconnectedNodeAcknowledged": True,
    }

    # Dump JSON and POST processor
    payload = json.dumps(put_dict).encode("utf8")
    header = {
        "Content-Type": "application/json",
        "Authorization": "Bearer {}".format(token),
    }
    response = requests.put(
        url_nifi_api + f"processors/{processor_id}/run-status",
        headers=header,
        data=payload,
    )
    return response


def parse_state(json_obj, state_key: str):
    """
    This function retrieves the value of a state by the key of the state out
    of the JSON.
    :param json_obj: the processor's general state.
    :param state_key: the key for the specific state.
    :raises ValueError: if the passed key cannot be found in the processor state.
    :return: value of the matching key.
    """
    states = json_obj["componentState"]["localState"]["state"]
    for state in states:
        if state["key"] == state_key:
            value = state["value"]
            return value
    raise ValueError(f"Could not find {state_key} ")


def pause(secs):
    init_time = time()
    while time() < init_time + secs:
        pass


def startnifi():
    # This function sets the Ni-Fi processor GenerateFlowFile into RUNNING state,
    # it wats 15 seconds to give the processor time to create a flowfile,
    # it sets the GenerateFlowFile orocessor into STOPPED state.
    # It needs the following functions:
    # - get_token (retrieves a JSON Web Token by authenticating the user)
    # - update_processor_status
    #   (retrieves the current status with a GET request, sets the state into
    #   a custom JSON and uses a PUT request to set a new status)
    # Initializing the variables
    url_nifi_api = "https://<ip address>:8443/nifi/?processGroupId=<clip>"
    # Check the id from the processor settings page in Ni-Fi
    processor_id = "<GenerateFlowFile processor id>"
    access_payload = {"username": "<username>",
                      "password": "<password>"
                      }

    token = get_token(url_nifi_api, access_payload)
    response = update_processor_status(processor_id, "RUNNING", token, url_nifi_api)
    pause(15)  # wait for 15 seconds to give NiFi time to create a flow file
    response = update_processor_status(processor_id, "STOPPED", token, url_nifi_api)


def finishnifi():
    # This function queries the  last Ni-Fi processor (UpdateAttribute) state
    # by using a while loop that allows reiterating the query every 60 seconds.
    # Initializing the variables
    url_nifi_api = "https://<ip address>:8443/nifi/?processGroupId=<clip>"
    # Check the id from the processor settings page in Ni-Fi
    processor_id = "<UpdateAttribute processor id>"
    access_payload = {"username": "<username>",
                      "password": "<password>"
                      }
    timestamp_property = "last_tms"  # the processor's attribute name

    token = get_token(url_nifi_api, access_payload)

    # Get the current timestamp
    processor_state = get_processor_state(url_nifi_api, processor_id, token=token)
    value_start = parse_state(processor_state, timestamp_property)

    # Query and wait until update or time out
    while True:
        processor_state = get_processor_state(url_nifi_api, processor_id, token=token)
        value_current = parse_state(processor_state, timestamp_property)

        if value_start == value_current:
            print("Waiting...")
            pause(60)
        else:
            print(f"Update found: {value_current}")
            break


# Initializing the default arguments
# Using dynamic dates for start_date would lead to never started tasks
default_args = {
                'owner': 'Benkku',
                'start_date': datetime(2023, 7, 3, 6),  # year, month, day, hour
                'retries': 0,
                'retry_delay': timedelta(minutes=5)
                }

# Instantiating a DAG object
prod_to_alfaDEV_dag = DAG(dag_id='prod_to_alfaDEV',
                       default_args=default_args,
                       description='Tasks to copy Timescale production into' \
                       'Timescale alfa',
                       # cron schedule (minute, hour, month day, month, week day)
                       schedule=None,  # for testing only
                       # schedule='6 3 14 * *',  # 03:06 UTC of the 14th of
                                               # each month
                       catchup=False,  # if False the scheduler creates a DAG run
                                       # only for the latest schedule interval
                       tags=['Timescale, alfa, refresh']
                       )

# Creating the 4th task
# This task starts a Apache Ni-Fi pipeline
nifistart_task = PythonOperator(task_id='nifistart',
                                python_callable=startnifi,
                                dag=prod_to_alfaDEV_dag)

# Creating the 5th task
# This task monitors a ni-Fi pipeline till the pipeline is over
nififinish_task = PythonOperator(task_id='nififinish',
                                python_callable=finishnifi,
                                dag=prod_to_alfaDEV_dag)

# Setting the order of tasks' execution
nifistart_task >> nififinish_task

Regarding url_nifi_api, in 'ip address' I have tried with both the private and the public ip address. The rest of the URL in 'clip' has been copied from Apache Ni-Fi when opening the processor group.

Regarding processor_id, 'processor id' in def startnifi has been copied from Apache Ni-Fi by going to Settings > Id in the first processor (GenerateFlowFile). In def finishnifi the Id has been read from the last processor (UpdateAttribute).

In access_payload I have used the username and password I use in Apache Ni-Fi.

This is the error I get in Airflow when running the DAG:

Failed to establish a new connection: [Errno 111] Connection refused in the first task (nifistart).

Any idea what there might be wrong?

I am running Airflow 2.6.2 and Apache Ni-Fi 1.18.0 on Linux Ubuntu 22.04.1. I have used the Airflow installation from PyPI.

Thanks,

Bernardo Di Chiara

Upvotes: 0

Views: 145

Answers (1)

Berni Hacker
Berni Hacker

Reputation: 87

The problem has been solved with the following modifications:

  1. Correction of the URL in the DAG file

The URL was wrong. The correct URL in def startnifi() and def finishnifi() is:

url_nifi_api = "https://ipaddress:8443/nifi-api/"

where ipaddress is the private ip address of the machine running Apache Ni-Fi.

  1. verify attribute added in the DAG file

Since both machines are in the same private Cloud, the verify=False attribute has been added to all the http operations in the DAG file.

  1. Allowing Ni-Fi to run from other than localhost

In the Linux machine running Ni-Fi, .../nifi-1.18.0/conf/nifi.properties has been edited in order to allow access from another EC2 machine.

Old configuration: nifi.web.https.host = 127.0.0.1

New configuration: nifi.web.https.host = 0.0.0.0

nifi.sh has been stopped and started again

NiFi has been started in the browser from: https://0.0.0.0:8443/nifi/login

Upvotes: 0

Related Questions