Félicien
Félicien

Reputation: 256

NiFi how to clear all queues

I'm currently using NiFi 1.5.0 (but it's the same as the previous versions) and I wonder if there is a way to clear all queues at the same time.

When the number of processors increases, resetting everything can take long.

(I already know how to clear a single queue : How to clear NiFi queues? )

I'm looking for a solution using either the UI or the API.

Upvotes: 5

Views: 8727

Answers (4)

Saikat
Saikat

Reputation: 16930

In the latest versions of NiFi, there's an easy way.

(Without selecting any processors/relations) Right-click on the canvas UI and then select Empty all queues.

Upvotes: 0

tomo.hirano
tomo.hirano

Reputation: 71

I confirmed that we can delete all queues in a processor group with these APIs even though there are multi-layered child processor groups under the target processor group.

ref: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html

POST: /process-groups/{id}/empty-all-connections-requests => Creates a request to drop all flowfiles of all connection queues in this process group.

GET: /process-groups/{id}/empty-all-connections-requests/{drop-request-id} => Gets the current status of a drop all flowfiles request.

DELETE: process-groups/{id}/empty-all-connections-requests/{drop-request-id} => Cancels and/or removes a request to drop all flowfiles.

Upvotes: 0

Chaffelson
Chaffelson

Reputation: 1269

I haven't had time to test this thoroughly, but it should work:

# In your linux shell - NiPyAPI is a Python2/3 SDK for the NiFi API
pip install nipyapi
python
# In Python
from nipyapi import config, canvas, nifi
# Get a flat list of all process groups
pgs = canvas.list_all_process_groups()
# get a flat list of all connections in all process groups
cons = []
for pg in pgs: cons += nifi.ProcessgroupsApi().get_connections(pg.id).connections
# Issue a drop order for every connection in every process group
for con in cons: nifi.FlowfilequeuesApi().create_drop_request(con.id)

Edit: I went ahead and implemented this as it seems useful: https://github.com/Chaffelson/nipyapi/issues/45

import nipyapi
pg = nipyapi.canvas.get_process_group('MyProcessGroup')
nipyapi.canvas.purge_process_group(pg, stop=True)

The stop option will deschedule the Process Group before purging it, just to be extra handy

Upvotes: 5

Bryan Bende
Bryan Bende

Reputation: 18670

If you want to get rid of all your data completely, you can stop NiFi and remove all of the "_repository" directories (flow file, content, and provenance). This is basically completely resetting your NiFi in terms of data.

Upvotes: 3

Related Questions