Lone Developer
Lone Developer

Reputation: 616

Mule File Inbound Flow : Control Number of threads

I want to control the number of threads in file inbound and message processor . Suppose if i have 5 files in my input directory then i should be able to process 2 files at a time . Once these files are processed (files content is processed by message processor ) , then onlly it should pick up other files . I have tried using synchronus processing strategy at flow level , but it processes only one file , i want multiple threads but each thread will process the message right from receiving file to send the response . I tried the method which david has suggested , but it is also not working . picks up only one file at a time .

<flow name="fileInboundTestFlow2" doc:name="fileInboundTestFlow2" processingStrategy="synchronous">
    <poll frequency="1000">
        <component class="FilePollerComponent" doc:name="File Poller"></component>
    </poll>
    <collection-splitter />
    <request-reply >
            <vm:outbound-endpoint path="out"/>
            <vm:inbound-endpoint path="response">
                               <collection-aggregator/>
            </vm:inbound-endpoint>                      
    </request-reply>
    <file:outbound-endpoint path="E:/fileTest/processed" />
</flow>

public class FilePollerComponent implements Callable{

private String pollDir="E://fileTest" ;

private int numberOfFiles = 3;

public String getPollDir()
{
    return pollDir;
}

public void setPollDir(String pollDir)
{
    this.pollDir = pollDir;
}



public int getNumberOfFiles()
{
    return numberOfFiles;
}

public void setNumberOfFiles(int numberOfFiles)
{
    this.numberOfFiles = numberOfFiles;
}

@Override
public Object onCall(MuleEventContext eventContext) throws Exception
{
    File f = new File(pollDir);
    List<File> filesToReturn = new ArrayList<File>(numberOfFiles);
    if(f.isDirectory())
    {
        File[] files = f.listFiles();
        int i = 0;
        for(File file : files)
        {
            if(file.isFile())
                filesToReturn.add(file);
            if(i==numberOfFiles)
                break ;
            i++;
        }
    }
    else
    {
        throw new Exception("Invalid Directory");
    }
    return filesToReturn;
}}

Upvotes: 1

Views: 2322

Answers (1)

David Dossot
David Dossot

Reputation: 33413

The file inbound endpoint is a poller so it uses one thread. If you make the flow synchronous, you're piggybacking this single thread, thus process one file at a time.

You need to create a flow processing strategy that allows 2 threads only. Use the following that allows 500 threads as an example: http://www.mulesoft.org/documentation/display/current/Flow+Processing+Strategies#FlowProcessingStrategies-Fine-TuningaQueued-AsynchronousProcessingStrategy

EDIT: The above proposal doesn't satisfy this requirement:

if i configure 3 threads , then three files will be picked from input directory and and untill these files are processed , no other file should be picked

Indeed, the above proposal would always have 3 files being processed in parallel instead of being processed in groups of 3.

So I'm proposing this alternative approach:

  • Configure the flow processing strategy to synchronous
  • Use a poll element as the source
  • Place a custom component in it, which selects 3 different files from a configurable directory. No need to lock anything because the synchronous strategy of the flow prevents re-entrance. Return a java.util.List of java.io.Files.
  • Add a collection-splitter after it.
  • Add a request-reply with an aggregator in the inbound endpoint to implement a fork-join pattern (http://blogs.mulesoft.org/aggregation-with-mule-fork-and-join-pattern/). Processing of the files will happen in another flow, while the polling flow will block until all 3 files have been processed.

Upvotes: 3

Related Questions