Reputation: 411
I am new to Nifi. I am working on NiFiProject that read the contents of file and do some ETL. The results needs to be put into different file.
I am getting the relationship not satisfied error:
MyspanishprocessorIid-b673bb80-0169-1 ooo-2f8a-c22081380d29 Myspanishprocessodidzb673bb80-0169-1000-2f8a-c22081380d29 failed to process session due to StandardFlowFileRecordluuidze8ee1374-ef25-43d5-b35e- ac76dba0955c,claimzStandardContentClaim (resourceClaimzStandardResourceClaim(idz1554235475648-1 , containerzdefault, section—Il, offset;O, transfer relationship not specified; Processor Administratively Yielded for 1 sec: org.apache.nifi.processor.exception.FlowFileHandlingExceptlon: StandardFlowFileRecordluuidze8ee1374-ef25-43d5-b35e- ac76dba0955c,claimzStandardContentClaim (resourceClaimzStandardResourceClaim(idz1554235475648-1 , container-default, section offset;O, transfer relationship not specified
The code that I have written is:
@Tags({"spanish"})
@CapabilityDescription("Spanish processor")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MySpanishProcessor extends AbstractProcessor {
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder().name("MY_PROPERTY")
.displayName("My property")
.description("Example Property")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_MATCH = new Relationship.Builder()
.name("matched")
.description("FlowFiles are routed to this relationship when the Regular Expression is successfully evaluated and the FlowFile is modified as a result")
.build();
public static final Relationship REL_NO_MATCH = new Relationship.Builder()
.name("unmatched")
.description("FlowFiles are routed to this relationship when no provided Regular Expression matches the content of the FlowFile")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(MY_PROPERTY);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_MATCH);
relationships.add(REL_NO_MATCH);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
Table officeTable = null;
Table legalEntitytable = null;
Table citiesTable = null;
Table joinOfOfficeLegalCityTable = null;
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
getLogger().debug("In the Trigger");
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
//Lets read the file using the call back
ArrayList<String> lineList= new ArrayList<>();
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream inputStream) throws IOException {
BufferedReader bufferedReader= new BufferedReader(new InputStreamReader(inputStream));
String line;
while ((line=bufferedReader.readLine())!=null)
{lineList.add(line);}
}
});
FlowFile flowFile1=session.create();
session.write(flowFile1, new OutputStreamCallback() {
@Override
public void process(OutputStream outputStream) throws IOException {
outputStream.write("No Data".getBytes());
}
});
// session.getProvenanceReporter().modifyAttributes(flowFile1);
session.transfer(flowFile1, REL_MATCH);//needs to be called to transfer
}
}
Upvotes: 1
Views: 1515
Reputation: 411
After some trial and error the following code worked.
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
getLogger().debug("In the Trigger");
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
//Lets read the file using the call back
ArrayList<String> lineList = new ArrayList<>();
final SpanishCodeFilePreprocessor spanishCodeFilePreprocessor = new SpanishCodeFilePreprocessor();
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream inputStream) throws IOException {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String line;
while ((line = bufferedReader.readLine()) != null) {
spanishCodeFilePreprocessor.identifyRecordTypeAndProcessIt(line);
}
}
});
}
catch (Exception e)
{
getLogger().error(e.toString());
}
try {
session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream outputStream) throws IOException {
officeTable=spanishCodeFilePreprocessor.getOfficeTable();
String s = "Office Table size: " + String.valueOf(officeTable.shape());
officeTable.write().csv(outputStream);
}
});
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_MATCH);//needs to be called to transfer
} catch (Exception e) {
getLogger().error("Exception in spanishProcessor");
session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream outputStream) throws IOException {
String s = "Office Table size: 0";
outputStream.write(s.getBytes());
}
});
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_NO_MATCH);//needs to be called to transfer
}
}
Upvotes: 0
Reputation: 18630
Every flow file must be accounted for, which means any flow file created from session.create or obtained from session.get, must either be transferred or removed.
The result of any session.write or session.putAttribute will return a new flow file reference that must be kept track of. So...
FlowFile flowFile1=session.create();
flowFile1 = session.write(flowFile1, new OutputStreamCallback() {
Then flowFile1 must be transferred.
Upvotes: 0