Reputation: 10469
Im looking at some kafka topics that generate ~30K messages / second. I have a flink topology setup to read one of these, aggregate a bit (5 second window) and then (eventually) write to a DB.
When I run my topology and remove everything but the read -> aggregate steps I can only get ~30K messages per minute. There isn't anywhere for backpressure to occur.
What am I doing wrong?
It appears that I'm only able to get ~1.5 MB/s. Not v close to the 100MB/s mentioned.
The current code path:
DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);
public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
private String mapId;
public mapper2(String mapId) {
this.mapId = mapId;
}
@Override
public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
tuple4.f0 = timeData.getId();
tuple4.f1 = timeData.getOtherId();
tuple4.f2 = timeData.getSections().size();
tuple4.f3 = mapId;
collector.collect(tuple4);
}
}
Upvotes: 3
Views: 4001
Reputation: 3172
I've never used Flink or it's KafkaConsumer, but I have experience with Kafka in a Storm environment. Here are some thoughts that I have. There's a lot of variables at play with how Kafka speed is determined. Here are some things to think about and investigate, add more details to your question when you have them.
There could be a lot of reasons why its consuming slowly, I've tried to highlight some of the general Kafka related stuff. I'm sure there are probably things you can do in Flink to speed up consuming that I don't know about because I've never used it.
Upvotes: 1
Reputation: 4542
From the code, I see two potential components which could cause the performance issues:
In order to understand where the bottleneck is, I would first measure the raw read performance of Flink reading from the Kafka topic.
Therefore, can you run the following code on your cluster?
public class RawKafka {
private static final Logger LOG = LoggerFactory.getLogger(RawKafka.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);
dataStream4.flatMap(new FlatMapFunction<byte[], Integer>() {
long received = 0;
long logfreq = 50000;
long lastLog = -1;
long lastElements = 0;
@Override
public void flatMap(byte[] element, Collector<Integer> collector) throws Exception {
received++;
if (received % logfreq == 0) {
// throughput over entire time
long now = System.currentTimeMillis();
// throughput for the last "logfreq" elements
if(lastLog == -1) {
// init (the first)
lastLog = now;
lastElements = received;
} else {
long timeDiff = now - lastLog;
long elementDiff = received - lastElements;
double ex = (1000/(double)timeDiff);
LOG.info("During the last {} ms, we received {} elements. That's {} elements/second/core. GB received {}",
timeDiff, elementDiff, elementDiff*ex, (received * 2500) / 1024 / 1024 / 1024);
// reinit
lastLog = now;
lastElements = received;
}
}
}
});
env.execute("Raw kafka throughput");
}
}
This code is measuring the time between 50k elements from Kafka and logging the number of elements read from Kafka. On my local machine I got a throughput of ~330k elements/core/second:
16:09:34,028 INFO RawKafka - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,028 INFO RawKafka - During the last 86 ms, we received 30000 elements. That's 348837.20930232556 elements/second/core. GB received 0
16:09:34,028 INFO RawKafka - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0
16:09:34,028 INFO RawKafka - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,030 INFO RawKafka - During the last 90 ms, we received 30000 elements. That's 333333.3333333333 elements/second/core. GB received 0
16:09:34,030 INFO RawKafka - During the last 91 ms, we received 30000 elements. That's 329670.3296703297 elements/second/core. GB received 0
16:09:34,030 INFO RawKafka - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0
I'm really interested to see which throughput you are achieving reading from Kafka.
Upvotes: 5