Dinosaurius
Dinosaurius

Reputation: 8628

How to properly merge multiple FlowFile's?

I use MergeContent 1.3.0 in order to merge FlowFiles from 2 sources: 1) from ListenHTTP and 2) from QueryElasticsearchHTTP.

The problem is that the merging result is a list of JSON strings. How can I convert them into a single JSON string?

{"event-date":"2017-08-08T00:00:00"}{"event-date":"2017-02-23T00:00:00"}{"eid":1,"zid":1,"latitude":38.3,"longitude":2.4} 

I would to get this result:

{"event-date":["2017-08-08T00:00:00","2017-02-23T00:00:00"],"eid":1,"zid":1,"latitude":38.3,"longitude":2.4} 

Is it possible?

UPDATE:

After changing data structure in Elastic, I was able to come up with the following output result of MergeContent. Now I have a common field eid in both JSON strings. I would like to merge these strings by eid in order to get a single JSON file. Which operator should I use?

{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}

I need to get the following output:

{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4,"dates":{"event-date":["2017-08-08","2017-02-23"]}}

It was suggested to use ExecuteScript to merge files. However I cannot figure out how to do this. This is what I tried:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "eid": obj['eid'],
          "zid": obj['zid'],
          ...
        }
    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))

flowFile1 = session.get()
flowFile2 = session.get()
if (flowFile1 != None && flowFile2 != None):
  # WHAT SHOULD I PUT HERE??
  flowFile = session.write(flowFile, ModJSON())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()

Upvotes: 1

Views: 2915

Answers (2)

daggett
daggett

Reputation: 28564

The example how to read multiple files from incoming queue using filtering

Assume you have multiple pairs of flow files with following content:

{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}

and

{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}

The same value of eid field provides a link between pairs.

Before merging we have to extract the value of eid field and put it into na attribute of the flow file for fast filtering.

Use the EvaluateJsonPath processor with properties:

Destination :  flowfile-attribute 
eid         :  $.eid

After this you'll have new eid attribute of the flow file.

Then use ExecuteScript processor with groovy language and with following code:

import org.apache.nifi.processor.FlowFileFilter;
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder

//get first flow file
def ff0 = session.get()
if(!ff0)return

def eid = ff0.getAttribute('eid')

//try to find files with same attribute in the incoming queue
def ffList = session.get(new FlowFileFilter(){
    public FlowFileFilterResult filter(FlowFile ff) {
        if( eid == ff.getAttribute('eid') )return FlowFileFilterResult.ACCEPT_AND_CONTINUE
        return FlowFileFilterResult.REJECT_AND_CONTINUE
    }
})

//let's assume you require two additional files in queue with the same attribute

if( !ffList || ffList.size()<1 ){
    //if less than required
    //rollback current session with penalize retrieved files so they will go to the end of the incoming queue
    //with pre-configured penalty delay (default 30sec)
    session.rollback(true)
    return
}

//let's put all in one list to simplify later iterations
ffList.add(ff0)

if( ffList.size()>2 ){
    //for example unexpected situation. you have more files then expected
    //redirect all of them to failure
    session.transfer(ffList, REL_FAILURE)
    return
}

//create empty map (aka json object)
def json = [:]
//iterate through files parse and merge attributes
ffList.each{ff->
    session.read(ff).withStream{rawIn->
        def fjson = new JsonSlurper().parse(rawIn)
        json.putAll(fjson)
    }
}
//create new flow file and write merged json as a content
def ffOut = session.create()
ffOut = session.write(ffOut,{rawOut->
    rawOut.withWriter("UTF-8"){writer->
        new JsonBuilder(json).writeTo(writer)
    }
} as OutputStreamCallback )
//set mime-type
ffOut = session.putAttribute(ffOut, "mime.type", "application/json")

session.remove(ffList)
session.transfer(ffOut, REL_SUCCESS)

Upvotes: 4

Bryan Bende
Bryan Bende

Reputation: 18630

Joining together two different types of data is not really what MergeContent was made to do.

You would need to write a custom processor, or custom script, that understood your incoming data formats and created the new output.

If you have ListenHttp connected to QueryElasticSearchHttp, meaning that you are triggering the query based on the flow file coming out of ListenHttp, then you may want to make a custom version of QueryElasticSearchHttp that takes the content of the incoming flow file and joins it together with any of the outgoing results.

Here is where the query result is currently written to a flow file:

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java#L360

Another option is to use ExecuteScript and write a script that could take multiple flow files and merge them together in the way you described.

Upvotes: 4

Related Questions