Reputation: 603
I am pretty new to NiFi. We have the setup done already where we are able to consume the Kafka messages. In the NiFi UI, I created the Processor with ConsumeKafka_0_10. When the messages are published (different process), My processor is able to pick up the required data/messages properly. I go to "Data provenance" and can see that the correct data is received.
However, I want to have the next process as some validator. That will read the flowfile from consumekafka and do basic validation (user-supplied script should be good)
How do we that or which processor works here?
Also any way to convert the flowfile input format into csv or json format?
Upvotes: 1
Views: 1274
Reputation: 14194
You have a few options. Depending on the flowfile content format, you can use ValidateRecord
with a *Reader
record reader controller service configured to validate it. If you already have a script to do this in Groovy/Javascript/Ruby/Python, ExecuteScript
is also a solution.
Similarly, to convert the flowfile content into CSV or JSON, use a ConvertRecord
processor, with a ScriptedReader
and a CSVRecordSetWriter
or JsonRecordSetWriter
to output into the correct format. These processes use the Apache NiFi record structure internally to convert from arbitrary input/output formats with high performance. Further reading is available at blogs.apache.org/nifi and bryanbende.com.
Upvotes: 1