Reputation: 17676
I want to create a custom NiFi
processor which can read ESRi ASCII grid files
and return CSV
like representation with some metadata per file and geo-referenced user data in WKT format.
Unfortunately, the parsed result is not written back as an updated flow file.
https://github.com/geoHeil/geomesa-nifi/blob/rasterAsciiGridToWKT/geomesa-nifi-processors/src/main/scala/org/geomesa/nifi/geo/AsciiGrid2WKT.scala#L71-L107 is my try at making this happen in NiFi.
Unfortunately, only the original files are returned. The converted output is not persisted.
When trying to adapt it to manually serialize some CSV strings like:
val lineSep = System.getProperty("line.separator")
val csvResult = result.map(p => p.productIterator.map{
case Some(value) => value
case None => ""
case rest => rest
}.mkString(";")).mkString(lineSep)
var output = session.write(flowFile, new OutputStreamCallback() {
@throws[IOException]
def process(outputStream: OutputStream): Unit = {
IOUtils.write(csvResult, outputStream, "UTF-8")
}
})
still no flowflies are written. Either the issue from above persists or I get Stream not closed exceptions for the outputStream.
It must be a tiny bit which is missing, but I can't seem to find the missing bit.
Upvotes: 1
Views: 2079
Reputation: 28564
Each session method that changes flow file like session.write() returns a new version of file and you have to transfer this new version.
If you change your file in converterIngester() function, you have to return this new version to caller function to transfer to relationship.
Upvotes: 4