Bob C.
Bob C.

Reputation: 141

Monitor backpressure count and size in custom processor

I have a custom processor (NiFi 1.8.0) that already modifies incoming flow files as needed. However before transferring the file to the outgoing relationship I would like to check if that relationship's backpressure is close to exceeding it's threshold. If it is then I plan to send the flow file to another relationship that connects to a PutFile processor where it will be written to disk.

I know I can get the incoming queue count and size. But I can't figure out how to get count and size from the outgoing relationship's connection.

Upvotes: 0

Views: 674

Answers (2)

Bob C.
Bob C.

Reputation: 141

I ended up finding the connections from the ProcessGroupStatus object:

String myProcessorId = this.getIdentifier();
int queuedCount = 0;
float queuedBytes = 0;
ProcessGroupStatus processGroupStatus = ((EventAccess) getControllerServiceLookup().getControllerStatus();

  if (processGroupStatus.getConnectionStatus() != null {
    Collection < CollectionStatus > groupConnections = processGroupStatus.getConnectionStatus();

    // Now have to iterate through groupConnections to find the one where the connection's source ID = myProcessorId and  
    // the connection's name = 'normal output' (this is the name of a relationship I added)

    ArrayList connections = new ArrayList <> (groupConnections);
    for (Object processorConnection : connections) {
     ConnectionStatus connection = (ConnectionStatus) processorConnection;

     if (connection.getName().equals("normal output") && connections.getSourceId.equals(myProcessorId)) {
      // Now I can grab the current count and size of the 'normal output' relationship
      // The back pressure threshold values can be grabbed from the connection as well
      queuedCount = connection.getQueuedCount();
      queuedBytes = connection.getQueuedBytes();
      break;
     }
    }
   }

The above only retrieves connections from the parrent group. If the connection you're looking for is contained in a child group, you will need to iterate through the child groups:

ProcessGroupStatus processGroupStatus = ((EventAccess) getControllerServiceLookup().getControllerStatus();
ArrayList childProcessorGroups = new ArrayList < > (processGroupStatus.getProcessGroupStatus());
for (Object childProcessorGroup : childProcessorGroups) {
 ProcessGroupStatus childProcessGroupStatus = (ProcessGroupStatus) childProcessorGroup;
 Collection < CollectionStatus > groupConnections = childProcessGroupStatus.getConnectionStatus();
 // Then iterate through groupConnections as above
}

The NiFi getControllerServiceLookup() does show an 'allConnections' variable which contains all connections across all processors in all groups. But there doesn't appear to be a getter for it. If there was a getter for it, you wouldn't have to worry about which group to look in for connections. You could simply iterate through 'allConnections' and look for the connection matching your processor ID and relationship name.

Upvotes: 0

Pushkr
Pushkr

Reputation: 3619

There is a controller service available called - SiteToSiteStatusReportingTask which essentially sends the status of each and every event that is happening in Nifi.

If you look at the data structure it returns , you can see it has few very helpful attributes on detecting backpressure -

// fields for connections
{ "name" : "sourceId", "type" : ["string", "null"]},
{ "name" : "sourceName", "type" : ["string", "null"]},
{ "name" : "destinationId", "type" : ["string", "null"]},
{ "name" : "destinationName", "type" : ["string", "null"]},
{ "name" : "maxQueuedBytes", "type" : ["long", "null"]},
{ "name" : "maxQueuedCount", "type" : ["long", "null"]},
{ "name" : "queuedBytes", "type" : ["long", "null"]},
{ "name" : "backPressureBytesThreshold", "type" : ["long", "null"]},
{ "name" : "backPressureObjectThreshold", "type" : ["long", "null"]},
{ "name" : "backPressureDataSizeThreshold", "type" : ["string", "null"]},
{ "name" : "isBackPressureEnabled", "type" : ["string", "null"]},

You can use this information to derive what you need. Refer this article for more details on implementation

Upvotes: 1

Related Questions