Reputation: 81
Below code is accepting 2 messages, before proceeding to outbound channel.
<bean id="timeout"
class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
<constructor-arg name="threshold" value="2" />
<constructor-arg name="timeout" value="7000" />
</bean>
<int:aggregator ref="updateCreate" input-channel="filteredAIPOutput"
method="handleMessage" release-strategy="releaseStrategyBean" release-strategy-method="timeout">
</int:aggregator>
My use case is to collate all the message for 10 min and send it to outbound channel. Not the based on the count of messages as shown above. To implement this time based functionality, used below code:
<int:aggregator ref="updateCreate" input-channel="filteredAIPOutput"
method="handleMessage"
output-channel="outputappendFilenameinHeader" >
</int:aggregator>
<bean id="updateCreate" class="helper.UpdateCreateHelper"/>
I passed 10 messages, PojoDateStrategyHelper canRelease method invoked 10 times.
Tried to implement PojoDateStrategyHelper, with time difference logic, it's working as expected. After 10 min UpdateCreateHelper class is called, but it received only 1 message(last message). Remaining 9 messages not seen anywhere. Am i doing anything wrong here ? Messages are not collating.
I suspect there should be something inbuild with in SI, which can achieve this, if i pass 10 min as parameter, once it expires the 10 min time, it should pass on all the messages to outbound channel.
This is my UpdateCreateHelper.java code :
public Message<?> handleMessage(List<Message<?>> flights){
LOGGER.debug("orderItems list ::"+flights.size()); // this is always printing 1
MessageBuilder<?> messageWithHeader = MessageBuilder.withPayload(flights.get(0).getPayload().toString());
messageWithHeader.setHeader("ftp_filename", "");
return messageWithHeader.build();
}
@CorrelationStrategy
public String correlateBy(@Header("id") String id) {
return id;
}
@ReleaseStrategy
public boolean canRelease(List<Message<?>> flights) {
LOGGER.debug("inside canRelease ::"+flights.size()); // This is called for each and every message
return compareTime(date.getTime(), new Date().getTime());
}
I am new to SI (v3.x), i searched a lot for time bound related aggregator, couldn't find any useful source, Please suggest.
thanks!
Upvotes: 1
Views: 1558
Reputation: 81
private String correlationId = date.toString();
@CorrelationStrategy
public String correlateBy(Message<?> message) {
**// Return the correlation ID which is the timestamp the current window started (all messages should have the same correlation id)**
return "same";
}
Earlier i was returning the Header Id, which is different from Message to Message. I hope this solution could help some one. I wasted almost 2 days by ignore such a small concept.
Upvotes: 1
Reputation: 174484
Turn on DEBUG logging to see why you only see one message.
I suspect there should be something inbuilt with in SI, which can achieve this, ...
Prior to version 4.0 (and, by default, after), the aggregator is a completely passive component; the release strategy is only consulted when a new message arrives.
4.0 added group timeout capabilities whereby partial groups can be released (or discarded) after a timeout.
However, with any version, you can configure a MessageGroupStoreReaper
to release partially complete groups after some timeout. See the documentation.
Upvotes: 1