Reputation: 363
I am trying to copy data from S3 to HDFS, observed couple of issues and have few questions.
Processor ConvertJSONToAvro - If the flowfile is not a valid JSON, then the processor gets stuck in infinite loop with following error.
ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] failed to process session due to java.lang.RuntimeException: Unexpected character ('"' (code 34)): was expecting comma to separate OBJECT entries
at [Source: org.apache.nifi.controller.repository.io.FlowFileAccessInputStream@2ad7d50d; line: 8, column: 14]: Unexpected character ('"' (code 34)): was expecting comma to separate OBJECT entries
at [Source: org.apache.nifi.controller.repository.io.FlowFileAccessInputStream@2ad7d50d; line: 8, column: 14]
16:45:35 UTC
WARNING
c09f4c27-0160-1000-6c29-1a31afc5a8d4
ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] Processor Administratively Yielded for 1 sec due to processing failure
Processor FetchS3Object - Irrespective of value set to "Object key", it always pick value of ${filename}. For example, if "Object key" is set to "${Newfilename}", it ignores the value set and picks ${filename} only.
Is it possible to refer flowfile from previous processors ? My usecase is FetchS3Object(file1) -> EvaluateJsonPath -> FetchS3Object(file2) -> PutHDFS -> FetchS3Object(file1) -> PutHDFS. In this case instead of loading file1 multiple times, is it possible to store and refer it through out the flow.
In above point, files file1 and file2 are one unit. Is there any options to copy both files or fail for both
ListS3 processor loads files based on timestamp. If a file is loaded and failed in any other step, then it needs to be loaded again for reprocessing. One option is update the timestamp of the file, so it will be avilable for ListS3 during next poll. How do we update timestamp of a file in S3 ? or there any other options to handle usecases like this.
Upvotes: 0
Views: 605
Reputation: 1002
Some processors have emit failed event, which you would have to map to a failure relationship. Others like EvaluateJSONPath processors go in an infinite loop loop for the message. This is an open issue with few processors.
Here's an article describing the process to build a custom NiFi processor: https://community.hortonworks.com/articles/4318/build-custom-nifi-processor.html
If you want to use existing processors/code and modify for your own purpose, then many of the standard processors can be found here: https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard
Refer the implementation of FetchS3Object here for more clarification.
Use Nifi UpdateAtttribute to add attributes to your flow file. Refer this https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.attributes.UpdateAttribute/index.html
The above UpdateAttribute processor should solve your problem.
Add a failure relationship to LoadS3, then pass it through a ExecuteScript processor, transform it accordingly and feed it back to LoadS3 Processor.
Upvotes: 1