Reputation: 1
I have found no way in NiFi to extract attributes directly from Avro so I am using ConvertAvroToJson -> EvaluateJsonPath -> ConvertJsonToAvro as the workaround.
But I would like to write a script to extract the attributes from the Avro flow file for use in an ExecuteScript processor to determine if it is a better approach.
Does anyone have a script to do this? Otherwise, I may end up using the original approach.
Thanks,
Kevin
Upvotes: 0
Views: 3460
Reputation: 14194
If you are extracting simple patterns from a single Avro record per flowfile, ExtractText
may be sufficient for you. If you want to take advantage of the new record processing available in Apache NiFi 1.3.0, AvroReader
is where you should start, and there are a series of blogs describing this process in detail. You can also extract Avro metadata with ExtractAvroMetadata
.
Upvotes: 0
Reputation: 12083
Here's a Groovy script (which needs the Avro JAR in its Module Directory property) where I let the user specify dynamic properties with JSONPath expressions to be evaluated against the Avro file. Ironically it does a GenericData.toString()
which converts the record to JSON anyway, but perhaps there is some code in here you could reuse:
import org.apache.avro.*
import org.apache.avro.generic.*
import org.apache.avro.file.*
import groovy.json.*
import org.apache.commons.io.IOUtils
import java.nio.charset.*
flowFile = session.get()
if(!flowFile) return
final GenericData genericData = GenericData.get();
slurper = new JsonSlurper().setType(JsonParserType.INDEX_OVERLAY)
pathAttrs = this.binding?.variables?.findAll {attr -> attr.key.startsWith('avro.path')}
newAttrs = [:]
try {
session.read(flowFile, { inputStream ->
def reader = new DataFileStream<>(inputStream, new GenericDatumReader<GenericRecord>())
GenericRecord currRecord = null;
if(reader.hasNext()) {
currRecord = reader.next();
log.info(genericData.toString(currRecord))
record = slurper.parseText(genericData.toString(currRecord))
pathAttrs?.each {k,v ->
object = record
v.value.tokenize('.').each {
object = object[it]
}
newAttrs[k - "avro.path."] = String.valueOf(object)
}
reader.close()
}
} as InputStreamCallback)
newAttrs.each{k,v ->
flowFile = session.putAttribute(flowFile, k,v)
}
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error("Error during Avro Path: {}", [e.message] as Object[], e)
session.transfer(flowFile, REL_FAILURE)
}
If you meant to extract Avro metadata vs fields (not totally sure what you meant by "attributes"), also check MergeContent's AvroMerge as there is some code in there to pull Avro metadata:
Upvotes: 2