Reputation: 508
I have been trying to access NiFi Flowfile attributes from Kafka message in Spark Streaming. I am using Java as language.
The scenario is that NiFI reads binary files from FTP location using GetSFTP processor and publishes byte[] messages to Kafka using publishKafka processor. These byte[] attributes are converted to ASCII data using Spark Streaming job and these decoded ASCII are written to Kafka for for further processing as well as saving to HDFS using NiFi processor.
My problem is that I cannot keep track of binary filename and decoded ASCII file. I have to add a header section (for filename, filesize, records count etc) in my decoded ASCII but I am failed to figure out how to access file name from NiFi Flowfile from KafkaConsumer object. Is there a way that I can do this using standard NiFi processors? Or please share any other suggestions to achieve this functionality. Thanks.
Upvotes: 1
Views: 1395
Reputation: 18630
So your data flow is:
FTP -> NiFi -> Kafka -> Spark Streaming -> Kafka -> NiFi -> HDFS ?
Currently Kafka doesn't have metadata attributes on each message (although I believe this may be coming in Kafka 0.11), so when NiFi publishes a message to a topic, it currently can't pass along the flow file attributes with the message.
You would have to construct some type of wrapper data format (maybe JSON or Avro) that contained the original content + the additional attributes you need, so that you could publish that whole thing as the content of one message to Kafka.
Also, I don't know exactly what you are doing in your Spark streaming job, but is there a reason you can't just do that part in NiFi? It doesn't sound like anything complex involving windowing or joins, so you could potentially simplify things a bit and have NiFi do the decoding, then have NiFi write it Kafka and to HDFS.
Upvotes: 2