Reputation: 133
Hi we are trying to streaming process finance market data to calculate trading signal by leveraging apache camel or spring integration. One of our use case is to aggregate consecutive prices together based on price timestamps as following:
the input message comes as (timestamp,price) pairs in time series. Suppose the values coming in as, each pair (TX,PX) is a message while T for time stamp and P for price value
(T0,P1),(T1,P1),(T2,P2),(T3,P3),(T4,P4)...
suppose we need to aggregate every 3 consecutive messages together for further calculation, given the input message we need to produce the following groups, each 3 pairs group is an aggregated message:
[(T0,P1),(T1,P1),(T2,P2)],
[(T1,P1),(T2,P2),(T3,P3)],
[(T2,P2),(T3,P3),(T4,P4)],
....
As you can see most of the messages will be aggregate to more than one group. Can someone suggests if there is a way to do this by using current aggregator without writing one.
It seems that spring integration aggregate grouping is based on correlation key as well, so the messages will need to map to a group of correlation keys. However, the current api seems only allow us to produce one correlation key, which means each message can only be aggregated to one group. Is there any work around for this.
P.S.
after reading the source code of camel, seems camel can not support our requirement. Just try my luck with spring. Finger crossed camel question
Upvotes: 1
Views: 1307
Reputation: 174749
We don't have anything out of the box, but I was able to do what you want with a small modification to the SimpleMessageStore
. I have posted the full RollingMessageStore
in a gist.
The bottom line is to modify removeGroup
to only remove the first message, and not the whole group. Also, make completeGroup
a no-op.
Set expreGroupOnCompletion
to force the aggregator to "remove" the group (by calling the modified removeGroup()
method.
Here is a diff between SimpleMessageGroup
and RollingMessageGroup
...
182,184c190,194
<
< groupUpperBound.release(groupIdToMessageGroup.get(groupId).size());
< groupIdToMessageGroup.remove(groupId);
---
> Message<?> message = this.groupIdToMessageGroup.get(groupId).getOne();
> if (message != null) {
> this.groupUpperBound.release(1);
> this.removeMessageFromGroup(groupId, message);
> }
(plus remove all the code in completeGroup()
.
and a test case...
@Test
public void testRolling() {
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new MultiplyingProcessor(), new RollingMessageStore());
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setReleaseStrategy(new ReleaseStrategy() {
@Override
public boolean canRelease(MessageGroup group) {
return group.size() == 3;
}
});
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null);
Message<?> message2 = createMessage(5, "ABC", 3, 2, replyChannel, null);
Message<?> message3 = createMessage(7, "ABC", 3, 3, replyChannel, null);
Message<?> message4 = createMessage(9, "ABC", 3, 3, replyChannel, null);
Message<?> message5 = createMessage(11, "ABC", 3, 3, replyChannel, null);
aggregator.handleMessage(message1);
aggregator.handleMessage(message2);
aggregator.handleMessage(message3);
aggregator.handleMessage(message4);
aggregator.handleMessage(message5);
Message<?> reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals(reply.getPayload(), 105);
reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals(reply.getPayload(), 315);
reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals(reply.getPayload(), 693);
}
Please go ahead and open a JIRA New Feature Issue and we'll look at adding this (or a more general solution) to the upcoming 3.0 release.
Use correlation-strategy-expression="'foo'"
and
release-strategy-expression=size()==3
.
Upvotes: 1