Reputation: 2943
I want to transfer my Nifi-ETL pipeline (mainly PUTSQL-processors) from my development instance to my production instance of Apache Nifi
, optimally with Python
for re-usability.
I thought I would give it a shot to just try and copy-paste them.
Code:
# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
, headers=header)
processor = json.loads(response.content)
# PUT processor
processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.put(nifi_url_prd + 'processors/' + proc_id
, data=payload
, headers=header)
This failed on the PUT with a 409 HTTP Conflict Error
. I am guessing this is because I am trying to put a ressource on an URI that expects a resource to exist already at that place.
The documentation lists "Create a processor, Set properties, Schedule" next to the processor APIs, but when looking into it, there is no dedicated API for creation - I decided to go with PUT because it says "Updates a processor" which is the closest thing I can see in there to creating a new one from scratch.
Do you have any ideas on how to create processors with Python? Either by copying existing ones or creating entirely new ones?
Upvotes: 0
Views: 497
Reputation: 2943
So the problem was that the API documentation is a bit misleading...
The correct API to create a new processor is process-groups/{process_group_id}/processors
. It is also listed in the docs under "Process Groups" and not under "Processors" despite the description.
The following worked for me - it was necessary to adapt the json a bit though: mainly to delete any IDs of the Dev-environment.
# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
, headers=header)
processor = json.loads(response.content)
# PUT processor
del processor["id"] # Processor ID cannot be specified.
del processor["uri"] # Processor ID cannot be specified.
del processor["component"]["id"] # Processor ID cannot be specified.
del processor["component"]["parentGroupId"] # Parent process group id should not be specified.
# If specified, the parent process group id must be the same as specified in the POST-URI.
processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.post(nifi_url_prd + 'process-groups/' + process_group + '/processors'
, data=payload
, headers=header)
Upvotes: 0