Reputation: 173
Does split()
operation on File object
buffer and execute each line or do all lines get loaded in memory at once? This is to understand the memory usage if the file happens to contain 100,000+ lines.
Is it fair for a transformer
to return void
? The usage is to compute a few logic from the payload & headers then add the computed value to headers. Is there a better way?
Thanks
UPDATE:
return IntegrationFlows.from(fileReadingMessageSource(), p -> p.poller(pollerSpec()))
.enrichHeaders(Collections.singletonMap(ERROR_CHANNEL, appErrorChannel))
.split() // process file by file
.log(INFO, message -> "Started File: " + message.getHeaders().get("file_name"))
.enrichHeaders(h -> h.headerFunction("foo", m -> integrationUtil.constructFoo())) // fooobject
.split(fileSplitterSpec()) // split file lines
.filter(payload -> !(payload instanceof FileSplitter.FileMarker), e -> e.discardChannel("aggregatorChannel"))
.log(INFO, message -> "Payload: " + message.getPayload())
.transform(barTransformer)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.log(INFO, message -> "Completed File: " + message.getHeaders().get("file_name"))
.aggregate()
.log(INFO, message -> "All Files Processed")
// .handle(null)
.get();
Upvotes: 0
Views: 152
Reputation: 121347
Yes, that was really a purpose of the FileSplitter
. It's internal logic is based on the FileIterator
, which reads line by line an emits it to the splitter output channel.
No, the transformer cannot return void
. That's not its purpose. Sounds more like an enrichHeaders()
is better for you. Messages are immutable and you just cannot modify the current message for possible further logic. You build a new message with new data (or header) and emit it as a reply downstream the flow.
Upvotes: 1