Reputation: 199
I want to send 100 messages/second
from my stream to a kafka topic. I have more than enough data in stream to do so.
So far, I have found windowing concept, but I am unable to modify it to my use case.
Upvotes: 0
Views: 631
Reputation: 21
Flink v1.15, I created function.
Refer to checkpointing_under_backpressure and process_function.
public class RateLimitFunction extends KeyedProcessFunction<String, String, String> {
private transient ValueState<Long> counter;
private transient ValueState<Long> lastTimestamp;
private final Long count;
private final Long millisecond;
public RateLimitFunction(Long count, Long millisecond) {
this.count = count;
this.millisecond = millisecond;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
counter = getRuntimeContext()
.getState(new ValueStateDescriptor<>("counter", TypeInformation.of(Long.class)));
lastTimestamp = getRuntimeContext()
.getState(new ValueStateDescriptor<>("last-timestamp", TypeInformation.of(Long.class)));
}
@Override
public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
Collector<String> out) throws Exception {
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime());
long current = counter.value() == null ? 0L : counter.value();
if (current < count) {
counter.update(current + 1L);
out.collect(value);
} else {
if (lastTimestamp.value() == null) {
lastTimestamp.update(ctx.timerService().currentProcessingTime());
}
Thread.sleep(millisecond);
out.collect(value);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
if (lastTimestamp.value() != null && lastTimestamp.value() + millisecond <= timestamp) {
counter.update(0L);
lastTimestamp.update(null);
}
}
}
Upvotes: 0
Reputation: 43707
You could do this easily with a ProcessFunction
. You would keep a counter in Flink state, and only emit elements when the counter is less than 100. Meanwhile, use a timer to reset the counter to zero once a second.
Upvotes: 3