Reputation: 381
I am using SI's aggregator and using custom aggregator and release strategy. PFB the snippet
return IntegrationFlows.from("uaDefaultChannel").wireTap(UA_WIRE_TAP_CHNL).transform(eventHandler, "parseEvent")
.aggregate(a -> a.correlationStrategy(corStrgy, "getCorrelationKey").releaseStrategy(g -> {
System.out.println("time entered"+System.currentTimeMillis());
boolean eonExists = g.getMessages().stream()
.anyMatch(eon -> ((FlightModel) eon.getPayload()).getEstGmtOnDtm() != null);
if (eonExists) {
boolean einExists = g.getMessages().stream()
.anyMatch(ein -> ((FlightModel) ein.getPayload()).getEstGmtInDtm() != null);
if (einExists) {
return true;
}
}
return false;
}).messageStore(this.messageStore)).handle(uaImpl, "process").get();
Its going to corelate startegy class but then its directly going to the null channel, I am using JdbcMessage store. Its not going into h Release Strategy class. PFB the log -
2020-01-28T13:18:55.472-0600 DEBUG Executing prepared SQL query
2020-01-28T13:18:55.472-0600 DEBUG Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
2020-01-28T13:18:55.472-0600 DEBUG Fetching JDBC Connection from DataSource
2020-01-28T13:18:55.473-0600 DEBUG message sent to null channel: GenericMessage [payload=Flight [UA/1016/20200128/IAH0-CUN: status=null, EON=null(GMT: null), EIN=null(GMT: 2020-01-28 20:15:43), tail=null], headers={jms_redelivered=false, jms_destination=Queue[UAL.OPS.NOC.FLIGHTAWARE.FLIFO.1], id=03accd44-2e22-eb3d-cf0a-2c291f8562b6, priority=4, jms_timestamp=1580239135458, jms_messageId=ID:DNDCEM01.309B5DA0C7C1C27739:181824, timestamp=1580239135471}]
2020-01-28T13:18:55.473-0600 DEBUG postSend (sent=true) on channel 'uaEventFlow.channel#0', message: GenericMessage [payload=Flight [UA/1016/20200128/IAH0-CUN: status=null, EON=null(GMT: null), EIN=null(GMT: 2020-01-28 20:15:43), tail=null], headers={jms_redelivered=false, jms_destination=Queue[UAL.OPS.NOC.FLIGHTAWARE.FLIFO.1], id=03accd44-2e22-eb3d-cf0a-2c291f8562b6, priority=4, jms_timestamp=1580239135458, jms_messageId=ID:DNDCEM01.309B5DA0C7C1C27739:181824, timestamp=1580239135471}]
Some of the messages are getting processed but majority of the messages are going to the null channel.
Upvotes: 1
Views: 110
Reputation: 174769
I deleted my comment; you are correct; new messages for expired groups will be sent to the nullChannel
if there is no discard channel.
Set expireGroupsUponCompletion
to true
to expire groups so that a new group is started with a new message with the same correlation id.
Upvotes: 2