edjm
edjm

Reputation: 5522

Apache Nifi - When utilizing SplitText on large files, how can I make the put files write out immediately

I am reading in text files with 50k rows of data where each row represents a complete record.

Our Nifi flow is utilizing the SplitText to handle the file in batches of 1000 rows. (This was setup before my time for memory issues I'm told)

Is it possible to have the PutFile execute immediately? I want the files to just right out the PutFile record once it is done and not just sit in queue waiting for all 50k+ rows of data have been processed. Seems rather dumb to do that if it is being split up.

I was reading up on documentation but I cannot find if this is by design and not configurable.

Appreciate any documentation guidance that can help answer/configure my flow.

Upvotes: 0

Views: 1124

Answers (1)

mattyb
mattyb

Reputation: 12093

TL;DR A workaround is to use multiple SplitTexts, the first one splitting into 10k rows for example, then the second to split into 1000 rows. Then the first 10k rows will be split into 10 flow files and sent downstream while the second 10k rows are being processed by the second SplitText.

EDIT: Adding another workaround, a Groovy script to be used in InvokeScriptedProcessor:

class GroovyProcessor implements Processor {
    def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
    def REL_ORIGINAL = new Relationship.Builder().name("original").description('After processing, the original incoming FlowFiles are routed here').build()
    def ComponentLog log

    void initialize(ProcessorInitializationContext context) { log = context.logger }
    Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS, REL_ORIGINAL] as Set }
    Collection<ValidationResult> validate(ValidationContext context) { null }
    PropertyDescriptor getPropertyDescriptor(String name) { null }
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
    List<PropertyDescriptor> getPropertyDescriptors() { null }
    String getIdentifier() { null }    
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        def session1 = sessionFactory.createSession()
        def session2 = sessionFactory.createSession()
        try {
            def inFlowFile = session1.get()
            if(!inFlowFile) return
            def inputStream = session1.read(inFlowFile)
            inputStream.eachLine { line -> 
               def outFlowFile = session2.create()
               outFlowFile = session2.write(outFlowFile, {outputStream -> 
                   outputStream.write(line.bytes)
               } as OutputStreamCallback)
               session2.transfer(outFlowFile, REL_SUCCESS)
               session2.commit()
            }
            inputStream.close()
            session1.transfer(inFlowFile, REL_ORIGINAL)
            session1.commit()
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            session2.rollback(true)
            session1.rollback(true)
            throw t
}}}
processor = new GroovyProcessor()

For completeness:

The Split processors were designed to support the Split/Merge pattern, and in order to merge them back together later, they each need the same "parent ID" as well as the count.

If you send flow files out before you've split everything up, you won't know the total count and won't be able to merge them back later. Also if something goes wrong with split processing, you may want to "rollback" the operation instead of having some flow files already downstream, and the rest of them sent to failure

In order to send out some flow files before all processing, you have to "commit the process session". This prevents you from doing the things above, and it creates a break in the provenance for the incoming flow file, as you have to commit/transfer that file in the session that originally takes it in. All following commits will need new flow files created, which breaks the provenance/lineage chain.

Although there is an open Jira for this (NIFI-2878), there has been some dissent on the mailing lists and pull requests about adding this feature to processors that accept input (i.e. non-source processors). NiFi's framework is fairly transactional, and this kind of feature flies in the face of that.

Upvotes: 3

Related Questions