Tilak
Tilak

Reputation: 363

Apache Nifi 1.4.0 questions and issues

I am trying to copy data from S3 to HDFS, observed couple of issues and have few questions.

Issues

  1. Processor ConvertJSONToAvro - If the flowfile is not a valid JSON, then the processor gets stuck in infinite loop with following error.

    ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] failed to process session due to java.lang.RuntimeException: Unexpected character ('"' (code 34)): was expecting comma to separate OBJECT entries
     at [Source: org.apache.nifi.controller.repository.io.FlowFileAccessInputStream@2ad7d50d; line: 8, column: 14]: Unexpected character ('"' (code 34)): was expecting comma to separate OBJECT entries
     at [Source: org.apache.nifi.controller.repository.io.FlowFileAccessInputStream@2ad7d50d; line: 8, column: 14]
    
    16:45:35 UTC
    WARNING
    c09f4c27-0160-1000-6c29-1a31afc5a8d4
    
    ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] Processor Administratively Yielded for 1 sec due to processing failure
    
  2. Processor FetchS3Object - Irrespective of value set to "Object key", it always pick value of ${filename}. For example, if "Object key" is set to "${Newfilename}", it ignores the value set and picks ${filename} only.

Questions

  1. Is it possible to refer flowfile from previous processors ? My usecase is FetchS3Object(file1) -> EvaluateJsonPath -> FetchS3Object(file2) -> PutHDFS -> FetchS3Object(file1) -> PutHDFS. In this case instead of loading file1 multiple times, is it possible to store and refer it through out the flow.

  2. In above point, files file1 and file2 are one unit. Is there any options to copy both files or fail for both

  3. ListS3 processor loads files based on timestamp. If a file is loaded and failed in any other step, then it needs to be loaded again for reprocessing. One option is update the timestamp of the file, so it will be avilable for ListS3 during next poll. How do we update timestamp of a file in S3 ? or there any other options to handle usecases like this.

Upvotes: 0

Views: 605

Answers (1)

Arnab Biswas
Arnab Biswas

Reputation: 1002

Issues

  1. Some processors have emit failed event, which you would have to map to a failure relationship. Others like EvaluateJSONPath processors go in an infinite loop loop for the message. This is an open issue with few processors.

    Here's an article describing the process to build a custom NiFi processor: https://community.hortonworks.com/articles/4318/build-custom-nifi-processor.html

    If you want to use existing processors/code and modify for your own purpose, then many of the standard processors can be found here: https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard

  2. Refer the implementation of FetchS3Object here for more clarification.

Questions

  1. Use Nifi UpdateAtttribute to add attributes to your flow file. Refer this https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.attributes.UpdateAttribute/index.html

  2. The above UpdateAttribute processor should solve your problem.

  3. Add a failure relationship to LoadS3, then pass it through a ExecuteScript processor, transform it accordingly and feed it back to LoadS3 Processor.

Upvotes: 1

Related Questions