chandan kharbanda
chandan kharbanda

Reputation: 199

How to throttle flink output to kafka?

I want to send 100 messages/second from my stream to a kafka topic. I have more than enough data in stream to do so.

So far, I have found windowing concept, but I am unable to modify it to my use case.

Upvotes: 0

Views: 631

Answers (2)

spearkkk
spearkkk

Reputation: 21

Flink v1.15, I created function.

Refer to checkpointing_under_backpressure and process_function.

public class RateLimitFunction extends KeyedProcessFunction<String, String, String> {

private transient ValueState<Long> counter;
private transient ValueState<Long> lastTimestamp;

private final Long count;
private final Long millisecond;

public RateLimitFunction(Long count, Long millisecond) {
    this.count = count;
    this.millisecond = millisecond;
}

@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);

    counter = getRuntimeContext()
      .getState(new ValueStateDescriptor<>("counter", TypeInformation.of(Long.class)));

    lastTimestamp = getRuntimeContext()
      .getState(new ValueStateDescriptor<>("last-timestamp", TypeInformation.of(Long.class)));

}

@Override
public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
  Collector<String> out) throws Exception {

    ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime());

    long current = counter.value() == null ? 0L : counter.value();
    if (current < count) {
        counter.update(current + 1L);
        out.collect(value);
    } else {
        if (lastTimestamp.value() == null) {
            lastTimestamp.update(ctx.timerService().currentProcessingTime());
        }
        Thread.sleep(millisecond);
        out.collect(value);
    }

}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
    if (lastTimestamp.value() != null && lastTimestamp.value() + millisecond <= timestamp) {
        counter.update(0L);
        lastTimestamp.update(null);
    }
}

}

Upvotes: 0

David Anderson
David Anderson

Reputation: 43707

You could do this easily with a ProcessFunction. You would keep a counter in Flink state, and only emit elements when the counter is less than 100. Meanwhile, use a timer to reset the counter to zero once a second.

Upvotes: 3

Related Questions