Georg Heiler
Georg Heiler

Reputation: 17676

custom nifi processor - writing of flow file

I want to create a custom NiFi processor which can read ESRi ASCII grid files and return CSV like representation with some metadata per file and geo-referenced user data in WKT format.

Unfortunately, the parsed result is not written back as an updated flow file.

https://github.com/geoHeil/geomesa-nifi/blob/rasterAsciiGridToWKT/geomesa-nifi-processors/src/main/scala/org/geomesa/nifi/geo/AsciiGrid2WKT.scala#L71-L107 is my try at making this happen in NiFi.

Unfortunately, only the original files are returned. The converted output is not persisted.

When trying to adapt it to manually serialize some CSV strings like:

  val lineSep = System.getProperty("line.separator")
  val csvResult = result.map(p => p.productIterator.map{
    case Some(value) => value
    case None => ""
    case rest => rest
  }.mkString(";")).mkString(lineSep)

  var output = session.write(flowFile, new OutputStreamCallback() {
    @throws[IOException]
    def process(outputStream: OutputStream): Unit = {
      IOUtils.write(csvResult, outputStream, "UTF-8")
    }
  })

still no flowflies are written. Either the issue from above persists or I get Stream not closed exceptions for the outputStream.

It must be a tiny bit which is missing, but I can't seem to find the missing bit.

Upvotes: 1

Views: 2079

Answers (1)

daggett
daggett

Reputation: 28564

Each session method that changes flow file like session.write() returns a new version of file and you have to transfer this new version.

If you change your file in converterIngester() function, you have to return this new version to caller function to transfer to relationship.

Upvotes: 4

Related Questions