Kevin McPhilamy
Kevin McPhilamy

Reputation: 1

Need to extract attributes directly from Avro using NiFi

I have found no way in NiFi to extract attributes directly from Avro so I am using ConvertAvroToJson -> EvaluateJsonPath -> ConvertJsonToAvro as the workaround.

But I would like to write a script to extract the attributes from the Avro flow file for use in an ExecuteScript processor to determine if it is a better approach.

Does anyone have a script to do this? Otherwise, I may end up using the original approach.

Thanks,

Kevin

Upvotes: 0

Views: 3460

Answers (2)

Andy
Andy

Reputation: 14194

If you are extracting simple patterns from a single Avro record per flowfile, ExtractText may be sufficient for you. If you want to take advantage of the new record processing available in Apache NiFi 1.3.0, AvroReader is where you should start, and there are a series of blogs describing this process in detail. You can also extract Avro metadata with ExtractAvroMetadata.

Upvotes: 0

mattyb
mattyb

Reputation: 12083

Here's a Groovy script (which needs the Avro JAR in its Module Directory property) where I let the user specify dynamic properties with JSONPath expressions to be evaluated against the Avro file. Ironically it does a GenericData.toString() which converts the record to JSON anyway, but perhaps there is some code in here you could reuse:

import org.apache.avro.*
import org.apache.avro.generic.*
import org.apache.avro.file.*
import groovy.json.*
import org.apache.commons.io.IOUtils
import java.nio.charset.*

flowFile = session.get()
if(!flowFile) return

final GenericData genericData = GenericData.get();
slurper = new JsonSlurper().setType(JsonParserType.INDEX_OVERLAY)

pathAttrs = this.binding?.variables?.findAll {attr -> attr.key.startsWith('avro.path')}
newAttrs = [:]
try {
   session.read(flowFile, { inputStream ->
   def reader = new DataFileStream<>(inputStream, new GenericDatumReader<GenericRecord>())
   GenericRecord currRecord = null;
   if(reader.hasNext()) {
       currRecord = reader.next();
       log.info(genericData.toString(currRecord))
       record = slurper.parseText(genericData.toString(currRecord))
       pathAttrs?.each {k,v ->
         object = record
         v.value.tokenize('.').each {
            object = object[it]
          }
          newAttrs[k - "avro.path."] = String.valueOf(object)
        }
        reader.close()
   }
} as InputStreamCallback)
newAttrs.each{k,v ->
  flowFile = session.putAttribute(flowFile, k,v)
}
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
  log.error("Error during Avro Path: {}", [e.message] as Object[], e)
  session.transfer(flowFile, REL_FAILURE)
}

If you meant to extract Avro metadata vs fields (not totally sure what you meant by "attributes"), also check MergeContent's AvroMerge as there is some code in there to pull Avro metadata:

Upvotes: 2

Related Questions