Cribber
Cribber

Reputation: 2943

Create Nifi PutSQL Processor with Python

I want to transfer my Nifi-ETL pipeline (mainly PUTSQL-processors) from my development instance to my production instance of Apache Nifi, optimally with Python for re-usability.

I thought I would give it a shot to just try and copy-paste them.

  1. GET processor from DEV with GET-request on /nifi-api/processors/{id}
  2. PUT processor to PRD Nifi-instance with PUT-request on /nifi-api/processors/{id}

Code:

# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
                            , headers=header)
processor = json.loads(response.content)


# PUT processor 
processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.put(nifi_url_prd + 'processors/' + proc_id
                        , data=payload
                        , headers=header)

This failed on the PUT with a 409 HTTP Conflict Error. I am guessing this is because I am trying to put a ressource on an URI that expects a resource to exist already at that place.

The documentation lists "Create a processor, Set properties, Schedule" next to the processor APIs, but when looking into it, there is no dedicated API for creation - I decided to go with PUT because it says "Updates a processor" which is the closest thing I can see in there to creating a new one from scratch.

Do you have any ideas on how to create processors with Python? Either by copying existing ones or creating entirely new ones?

Upvotes: 0

Views: 497

Answers (1)

Cribber
Cribber

Reputation: 2943

So the problem was that the API documentation is a bit misleading... The correct API to create a new processor is process-groups/{process_group_id}/processors . It is also listed in the docs under "Process Groups" and not under "Processors" despite the description.

The following worked for me - it was necessary to adapt the json a bit though: mainly to delete any IDs of the Dev-environment.

# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
                            , headers=header)
processor = json.loads(response.content)


# PUT processor 
del processor["id"]  # Processor ID cannot be specified.
del processor["uri"]  # Processor ID cannot be specified.
del processor["component"]["id"]  # Processor ID cannot be specified.

del processor["component"]["parentGroupId"]  # Parent process group id should not be specified.
# If specified, the parent process group id must be the same as specified in the POST-URI.

processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.post(nifi_url_prd + 'process-groups/' + process_group + '/processors'
                        , data=payload
                        , headers=header)

Upvotes: 0

Related Questions