chetan
chetan

Reputation: 411

NiFi Read and Modify file contents

I am new to Nifi. I am working on NiFiProject that read the contents of file and do some ETL. The results needs to be put into different file.

I am getting the relationship not satisfied error:

MyspanishprocessorIid-b673bb80-0169-1 ooo-2f8a-c22081380d29 Myspanishprocessodidzb673bb80-0169-1000-2f8a-c22081380d29 failed to process session due to StandardFlowFileRecordluuidze8ee1374-ef25-43d5-b35e- ac76dba0955c,claimzStandardContentClaim (resourceClaimzStandardResourceClaim(idz1554235475648-1 , containerzdefault, section—Il, offset;O, transfer relationship not specified; Processor Administratively Yielded for 1 sec: org.apache.nifi.processor.exception.FlowFileHandlingExceptlon: StandardFlowFileRecordluuidze8ee1374-ef25-43d5-b35e- ac76dba0955c,claimzStandardContentClaim (resourceClaimzStandardResourceClaim(idz1554235475648-1 , container-default, section offset;O, transfer relationship not specified

The code that I have written is:

 @Tags({"spanish"})
@CapabilityDescription("Spanish processor")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MySpanishProcessor extends AbstractProcessor {
    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder().name("MY_PROPERTY")
            .displayName("My property")
            .description("Example Property")
            .required(false)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship REL_MATCH = new Relationship.Builder()
            .name("matched")
            .description("FlowFiles are routed to this relationship when the Regular Expression is successfully evaluated and the FlowFile is modified as a result")
            .build();
    public static final Relationship REL_NO_MATCH = new Relationship.Builder()
            .name("unmatched")
            .description("FlowFiles are routed to this relationship when no provided Regular Expression matches the content of the FlowFile")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(MY_PROPERTY);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_MATCH);
        relationships.add(REL_NO_MATCH);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }
    Table officeTable = null;
    Table legalEntitytable = null;
    Table citiesTable = null;
    Table joinOfOfficeLegalCityTable = null;
    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        getLogger().debug("In the Trigger");
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
         //Lets read the file using the call back
        ArrayList<String> lineList= new ArrayList<>();
        session.read(flowFile, new InputStreamCallback() {
            @Override
            public void process(InputStream inputStream) throws IOException {
                BufferedReader bufferedReader= new BufferedReader(new InputStreamReader(inputStream));
                String line;
                while ((line=bufferedReader.readLine())!=null)
                {lineList.add(line);}
            }
        });

        FlowFile flowFile1=session.create();
        session.write(flowFile1, new OutputStreamCallback() {
            @Override
            public void process(OutputStream outputStream) throws IOException {
                outputStream.write("No Data".getBytes());
            }
        });
//        session.getProvenanceReporter().modifyAttributes(flowFile1);
        session.transfer(flowFile1, REL_MATCH);//needs to be called to transfer
    }
}

Upvotes: 1

Views: 1515

Answers (2)

chetan
chetan

Reputation: 411

After some trial and error the following code worked.

 @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    getLogger().debug("In the Trigger");
    FlowFile flowFile = session.get();
    if (flowFile == null) {
        return;
    }
    //Lets read the file using the call back
    ArrayList<String> lineList = new ArrayList<>();
  final SpanishCodeFilePreprocessor spanishCodeFilePreprocessor = new SpanishCodeFilePreprocessor();
    try {
        session.read(flowFile, new InputStreamCallback() {
            @Override
            public void process(InputStream inputStream) throws IOException {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    spanishCodeFilePreprocessor.identifyRecordTypeAndProcessIt(line);
                }
            }
        });
    }
    catch (Exception e)
    {
        getLogger().error(e.toString());
    }
    try {
        session.write(flowFile, new OutputStreamCallback() {
            @Override
            public void process(OutputStream outputStream) throws IOException {
                officeTable=spanishCodeFilePreprocessor.getOfficeTable();

                String s = "Office Table size: " + String.valueOf(officeTable.shape());
                officeTable.write().csv(outputStream);
            }
        });
        session.getProvenanceReporter().modifyAttributes(flowFile);
        session.transfer(flowFile, REL_MATCH);//needs to be called to transfer
    } catch (Exception e) {
        getLogger().error("Exception in spanishProcessor");
        session.write(flowFile, new OutputStreamCallback() {
            @Override
            public void process(OutputStream outputStream) throws IOException {
                String s = "Office Table size: 0";
                outputStream.write(s.getBytes());
            }
        });
        session.getProvenanceReporter().modifyAttributes(flowFile);
        session.transfer(flowFile, REL_NO_MATCH);//needs to be called to transfer
    }


}

Upvotes: 0

Bryan Bende
Bryan Bende

Reputation: 18630

Every flow file must be accounted for, which means any flow file created from session.create or obtained from session.get, must either be transferred or removed.

The result of any session.write or session.putAttribute will return a new flow file reference that must be kept track of. So...

FlowFile flowFile1=session.create();
flowFile1 = session.write(flowFile1, new OutputStreamCallback() {

Then flowFile1 must be transferred.

Upvotes: 0

Related Questions