Reputation: 3
I'm prototyping a fraud application. We'll frequently have metrics like "total amount of cash transactions in the last 5 days" that we need to compare against some threshold to determine if we raise an alert.
We're looking to use Kafka Streams to create and maintain the aggregates and then create an enhanced version of the incoming transaction that has the original transaction fields plus the aggregates. This enhanced record gets processed by a downstream rules system.
I'm wondering the best way to approach this. I've prototyped creating the aggregates with code like this:
TimeWindows twoDayHopping TimeWindows.of(TimeUnit.DAYS.toMillis(2))
.advanceBy(TimeUnit.DAYS.toMillis(1));
KStream<String, AdditiveStatistics> aggrStream = transactions
.filter((key,value)->{
return value.getAccountTypeDesc().equals("P") &&
value.getPrimaryMediumDesc().equals("CASH");
})
.groupByKey()
.aggregate(AdditiveStatistics::new,
(key,value,accumulator)-> {
return AdditiveStatsUtil
.advance(value.getCurrencyAmount(),accumulator),
twoDayHopping,
metricsSerde,
"sas10005_store")
}
.toStream()
.map((key,value)-> {
value.setTransDate(key.window().start());
return new KeyValue<String, AdditiveStatistics>(key.key(),value);
})
.through(Serdes.String(),metricsSerde,datedAggrTopic);;
This creates a store-backed stream that has a records per key per window. I then join the original transactions stream to this window to produce the final output to a topic:
JoinWindows joinWindow = JoinWindows.of(TimeUnit.DAYS.toMillis(1))
.before(TimeUnit.DAYS.toMillis(1))
.after(-1)
.until(TimeUnit.DAYS.toMillis(2)+1);
KStream<String,Transactions10KEnhanced> enhancedTrans = transactions.join(aggrStream,
(left,right)->{
Transactions10KEnhanced out = new Transactions10KEnhanced();
out.setAccountNumber(left.getAccountNumber());
out.setAccountTypeDesc(left.getAccountTypeDesc());
out.setPartyNumber(left.getPartyNumber());
out.setPrimaryMediumDesc(left.getPrimaryMediumDesc());
out.setSecondaryMediumDesc(left.getSecondaryMediumDesc());
out.setTransactionKey(left.getTransactionKey());
out.setCurrencyAmount(left.getCurrencyAmount());
out.setTransDate(left.getTransDate());
if(right != null) {
out.setSum2d(right.getSum());
}
return out;
},
joinWindow);
This produces the correct results, but it seems to run for quite a while, even with a low number of records. I'm wondering if there's a more efficient way to achieve the same result.
Upvotes: 0
Views: 1180
Reputation: 62320
It's a config issues: cf http://docs.confluent.io/current/streams/developer-guide.html#memory-management
Disable caching by setting cache size to zero (parameter cache.max.bytes.buffering
in StreamsConfig
) will resolve the "delayed" delivery to the output topic.
You might also read this blog post for some background information about Streams design: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
Upvotes: 1