adeelmahmood
adeelmahmood

Reputation: 2431

aggregator release strategy depend on another service activator running

I understand how aggregating based on size works but I also want to make the release strategy depend on another step in the pipeline to be still running. The idea is that i move files to a certain dir "source", aggregate enough files and then move from "source" to "stage" and then process the staged files. While this process is running I dont want to put more files in stage but I do want to continue to add more files to source folder (that part is handled by using a dispatcher channel connected with file inbound adapter before the aggregator)

<int:aggregator id="filesBuffered" 
                input-channel="sourceFilesProcessed"
                output-channel="stagedFiles"
                release-strategy-expression="size() == 10" 
                correlation-strategy-expression="'mes-group'" 
                expire-groups-upon-completion="true" 
                />

<int:channel id="stagedFiles" />

<int:service-activator  input-channel="stagedFiles"
                        output-channel="readyForMes"
                        ref="moveToStage"   
                        method="move" />

so as you can see I dont want to release the aggregated messages if an existing instance of moveToStage service activator is running.

I thought about making the stagedFiles channel a queue channel but that doesnt seems right because I do want the files to be passed to moveToStage as a Collection not a single file which I am assuming by making stagedFiles a queue channel it will send a single file. Instead I want to get to a threshold e.g. 10 files, pass those to stagedFiles which allows the moveToStage to process those files but then until this step is done I want the aggregator to continue to aggregate files and then release all aggregated files.

Thanks

Upvotes: 0

Views: 927

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121292

I suggest you to have some flag as a AtomicBoolean bean and use it from your moveToStage#move and check it's state from:

release-strategy-expression="size() >= 10 and @stagingFlag.get()" 

Upvotes: 1

Related Questions