Luckylukee
Luckylukee

Reputation: 595

Nifi record counts

I am getting files from remote server using Nifi: my files are as follow:

timestamp (ms), nodeID,value
12345,x,12.4
12346,x,12.7
12348,x,13.4
12356,x,13,6
12355,y,12.0

I am now just get and fetch and split lines and send them to Kafka, but before hand, I need to apply a checksum approach on my records and aggregate them based on time stamp, what I need to do to add an additional column to my content and count the records based on aggregated time stamps, for example aggregation based on each 10 milliseconds and nodeID..

timestamp (ms), nodeID,value, counts
12345,x,12.4,3
12346,x,12.7,3
12348,x,13.4,3
12356,x,13,6,1
12355,y,12.0,1

How to do above process in NiFi. I am totally new to Nifi but need to add above functinality to my Nifi process. I am currently using below nifi process enter image description here

Upvotes: 0

Views: 4605

Answers (1)

Bryan Bende
Bryan Bende

Reputation: 18670

This may not answer your question directly, but you should consider refactoring your flow to use the "record" processors. It would greatly simplify things and would probably get you closer to being able to do the aggregation.

The idea is to not split up the records, and instead process them in place. Given your current flow, the 4 processors after FetchSFTP would like change to a single ConvertRecord processor that converts CSV to JSON. You would first need to defined a simple Avro schema for your data.

Once you have the record processing setup, you might be able to use PartitionRecord to partition the records by the node id, and then from there the missing piece would be how to count by the timestamps.

Some additional resources...

https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi

https://bryanbende.com/development/2017/06/20/apache-nifi-records-and-schema-registries

https://www.slideshare.net/BryanBende/apache-nifi-record-processing

Upvotes: 3

Related Questions