Reputation: 1035
I use kafka for voting app, where user can choose candidate and change the selection during 1 hour time-range.
Since this is suitable for KTable
, I use kafka stream app. However, there is time-range requirement, means I need to groupBy().count()
only for specific time-range (e.g. from 10:00-11:00).
How can I achieve this using Kafka Stream API?
As far as I know, Kafka (I use Kafka 2.3) put published timestamp on metadata, but how to access it? I'm thinking of using .filter()
based on timestamp
Also I see windowing documentation but it seems the time is relative (e.g. last 1 hour) instead of fixed (10:00-11:00).
Thank you
Upvotes: 1
Views: 3371
Reputation: 1360
Timothy,
To access the timestamp of the record, you can use a transformValues()
operation.
The ValuesTransformer
you supply has access to the ProcessorContext
and you can call ProcessorContex.timestamp()
in the ValueTransformer.transform()
method.
If the timestamp is within the desired range, return the record otherwise return null. Then add a filter()
after the transformValues()
to remove the records you've rejected.
Here's an example I think will work
class GroupByTimestampExample {
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
// You need to update the the time fields these are just placeholders
long earliest = Instant.now().toEpochMilli();
long latest = Instant.now().toEpochMilli() + (60 * 60 * 1000);
final ValueTransformerSupplier<String, String> valueTransformerSupplier = new TimeFilteringTransformer(earliest, latest);
final KTable<String, Long> voteTable = builder.<String, String>stream("topic")
.transformValues(valueTransformerSupplier)
.filter((k, v) -> v != null)
.groupByKey()
.count();
}
static final class TimeFilteringTransformer implements ValueTransformerSupplier<String, String> {
private final long earliest;
private final long latest;
public TimeFilteringTransformer(final long earliest, final long latest) {
this.earliest = earliest;
this.latest = latest;
}
@Override
public ValueTransformer<String, String> get() {
return new ValueTransformer<String, String>() {
private ProcessorContext processorContext;
@Override
public void init(ProcessorContext context) {
processorContext = context;
}
@Override
public String transform(String value) {
long ts = processorContext.timestamp();
if (ts >= earliest && ts <= latest) {
return value;
}
return null;
}
@Override
public void close() {
}
};
}
}
}
Let me know how it goes.
Upvotes: 2
Reputation: 1366
Actually Tumbling window is Fixed-size, non-overlapping, gap-less windows
. In your use case the window duration is one hour, and as your example, a window 10:00-11:00 will be created (start inclusive, end exclusive):
kStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.count();
Upvotes: 1