Reputation: 28289
I'm using flink to read from kafka and write to redis.
For test, I just want to read the first 10 messages from kafka. So I use a counter and try to stop the consumer when the counter = 10
AtomicInteger counter = new AtomicInteger(0);
FlinkKafkaConsumer08<String> kafkaConsumer =
new FlinkKafkaConsumer08<>("my topic",
new SimpleStringSchema() {
@Override
public boolean isEndOfStream(String nextElement) {
// It should only read 10 kafka message
return counter.getAndIncrement() > 9;
}
},
properties);
but I get 30 message in redis:
llen rtp:example
(integer) 30
When I change the condition to counter.getAndIncrement() > 8
, it writes 27 messages to redis. Always triple.
Complete code:
public class FlinkEntry {
private final static JedisCluster JEDIS_CLUSTER;
static {
Set<HostAndPort> hostAndPorts = new HashSet<>();
hostAndPorts.add(new HostAndPort("localhost", 7001));
JEDIS_CLUSTER = new JedisCluster(hostAndPorts);
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer08<String> kafkaConsumer = createKafkaConsumer();
DataStream<String> dataStream = environment.addSource(kafkaConsumer);
SinkFunction<String> redisSink = createRedisSink();
dataStream.addSink(redisSink);
environment.execute();
}
private static FlinkKafkaConsumer08<String> createKafkaConsumer() {
Properties properties = new Properties();
//... set kafka property
AtomicInteger counter = new AtomicInteger(0);
FlinkKafkaConsumer08<String> kafkaConsumer =
new FlinkKafkaConsumer08<>("my topic",
new SimpleStringSchema() {
@Override
public boolean isEndOfStream(String nextElement) {
// It should only read 10 kafka message
return counter.getAndIncrement() > 9;
}
},
properties);
kafkaConsumer.setStartFromLatest();
return kafkaConsumer;
}
private static SinkFunction<String> createRedisSink() {
return new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) {
JEDIS_CLUSTER.lpush("rtp:example", value);
JEDIS_CLUSTER.expire("rtp:example", 10 * 60);
}
};
}
}
Upvotes: 2
Views: 808
Reputation: 43697
One approach to understanding this would be to disable operator chaining by calling
env.disableOperatorChaining();
and then look at some metrics -- e.g., numRecordsOut at the source, and numRecordsIn at the sink. I would also double check that the whole job is running with the parallelism set to 1.
You'll need to disable chaining because otherwise the whole job will collapse to a single task, and there won't any metrics collected for the communication between the two operators.
Upvotes: 1