Reputation: 21
I'm programming a custom processor in Nifi v 1.3
The processor executes an SQL query read from the resultset and transforms every row to json document and stores it into an ArrayList, finally it transfers every 1000 document (fetchSize param) to a flowfile, this work for me, but it sends all flowFiles at once.
What I want is for it to transfer each flowfile independently when I call transferFlowFile method without waiting for the end of the onTrigger method to transfer everything at once.
here the code :
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
if (fileToProcess == null && context.hasNonLoopConnection()) {
return;
}
}
final ResultSet resultSet = st.executeQuery();
final ResultSetMetaData meta = resultSet.getMetaData();
final int nrOfColumns = meta.getColumnCount();
List<Map<String, Object>> documentList = new ArrayList<>();
while (resultSet.next()) {
final AtomicLong nrOfRows = new AtomicLong(0L);
cpt++;
Map<String, Object> item = new HashMap<>();
for (int i = 1; i <= nrOfColumns; i++) {
int javaSqlType = meta.getColumnType(i);
String nameOrLabel = StringUtils.isNotEmpty(meta.getColumnLabel(i)) ? meta.getColumnLabel(i)
: meta.getColumnName(i);
Object value = null;
value = resultSet.getObject(i);
if (value != null) {
item.put(nameOrLabel, value.toString());
}
}
Document document = new Document(item);
documentList.add(document);
if (fetchSize!=0 && cpt % fetchSize == 0) {
FlowFile flowFile = session.create();
transferFlowFile(flowFile, session, documentList, fileToProcess, nrOfRows, stopWatch);
}
}
if (!documentList.isEmpty()) {
final AtomicLong nrOfRows = new AtomicLong(0L);
FlowFile flowFile = session.create();
transferFlowFile(flowFile, session, documentList, fileToProcess, nrOfRows, stopWatch);
}
}
public void transferFlowFile(FlowFile flowFile, ProcessSession session, List<Map<String, Object>> documentList,
FlowFile fileToProcess, AtomicLong nrOfRows, StopWatch stopWatch) {
flowFile = session.write(flowFile, out -> {
ObjectMapper mapper = new ObjectMapper();
IOUtils.write(mapper.writeValueAsBytes(documentList), out);
});
documentList.clear();
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
session.getProvenanceReporter().modifyContent(flowFile, "Retrieved " + nrOfRows.get() + " rows",
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
}
Upvotes: 2
Views: 1363
Reputation: 18630
Call session.commit()
after
session.transfer(flowFile, REL_SUCCESS)
The any flows files created since the last commit, or since the beginning if there was never a commit, will be transferred when the session is committed.
Upvotes: 3