Reputation: 2431
I am using spring integration to process some directories for files and each file goes through a "flow". I would like to set the overall processing of files in such a way that a file poller monitors a directory (or multiple) for new files. Once a new file has been picked up poller it should be passed to the subsequent components in the flow where this new file is processed while the polling process is not held. The second aspect of processing is that all new files go through a few steps before they are aggregated by an aggregator based on e.g. number of files (the criteria changes across directories). Once enough files have been accumulated then they are released from aggregated and then processed in some time consuming steps after aggregator. So the overall process looks like this
file-A picked up
file-A passed from poller to step1
file-A passed from step1 to aggregator
file-B picked up
file-B passed from poller to step1
file-B passed from step1 to aggregator
file-C picked up
file-C passed from poller to step1
file-C passed from step1 to aggregator
files A,B and C are released from aggregator
files A,B and C are processed by final-step
so overall there are two requirements
How I attempted to satisfy these two requirements is for #1 i simply used a queue after file poller where the new files are dropped and step-a picks up files from queue. This detaches the polling process and the idea was to use a thread-executor in step-a service activator to process each file in a single thread
Second requirement was automatically handled by simply executing final-step after aggregator in the same thread as aggregator. Since the aggregator places a lock based on correlation id if another group is released for same correlation id it just simply waits before the previous instance of same group is being processed.
The problem I ran into is that #1 wasnt being fulfilled because the service activator was waiting until the end of thread completion before attempting to create another thread for second file. This is kind of not helpful because this way having a thread executor on service activator is not useful. It only seems to create second therad after completing the first thread. So to fix this I replaced queued channel with a dispatcher channel and palced the executor on the dispatcher channel. Now each file was being processed in a separate thread and multiple files were being processed at the same time.
Now for second part, since the components after aggregator are time consuming I wanted to disconnect that process from first part so I placed a queued channel after aggregator but now with this approach the locking behavior that I was previously getting with aggregator is gone because the thread that released the messages from aggregator dies/completes in the queued channel before final time consuming step.
Any thoughts on the overall process. How can I accomplish both of my requirements while running things in parallel.
Thanks
Upvotes: 0
Views: 278
Reputation: 174494
It's not at all clear what your question is. Yes, for the downstream flow to run under the lock, it must run on the final "releasing" thread (the thread processing the last inbound message that completes the group), you can't use a queue or executor channel downstream of the aggregator.
However, this has no impact on the threads from the executor channel; other groups (with different correlation) will process. However, if you are using the same correlation id for the "next" group, its threads will block.
If you are saying you want to assemble the next group (with the same correlationid) while the first one is processing downstream, you'll have to use some other mechanism to enforce single threading downstream - such as an executor channel with a single-thread executor, or use another lock registry.
Upvotes: 0