Reputation: 317
I have the below configuration:
Flink job snippet:
speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new
SimpleStringSchema(), props));
Scenario 1:
I have written a flink job (Producer) on eclipse which is reading a file from a folder and putting the msgs on kafka topic.
So when i run this code using eclipse, it works fine.
For example : If I place a file with 100 records, flink sends few msgs to partition 1 & few msgs to partition 2 and hence both the consumers gets few msgs.
Scenario 2: When i create the jar of the above code and run it on flink server, flink sends all the msgs to a single partition and hence only one consumer get all the msgs.
I want the scenario 1 using the jar created in scenario 2.
Upvotes: 2
Views: 2144
Reputation: 826
For Flink-Kafka Producers, add "null" as the last parameter.
speStream.addSink(new FlinkKafkaProducer011(
kafkaTopicName,
new SimpleStringSchema(),
props,
(FlinkKafkaPartitioner) null)
);
The short explanation for this is that this turns off Flink from using the default partitioner FlinkFixedPartitioner
. This being turned off as the default will allow Kafka to distribute the data amongst its partitions as it sees fit. If this is NOT turned off, then each parallelism/task slot used for the sink that utilizes the FlinkKafkaProducer will only write to one partition per parallelism/task slot.
Upvotes: 2
Reputation: 3422
If you do not provide a FlinkKafkaPartitioner
or do not explicitly say to use Kafka's one a FlinkFixedPartitioner
will be used, meaning that all events from one task will end up in the same partition.
To use Kafka's partitioner use this ctor:
speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new SimpleStringSchema(), props), Optional.empty());
The difference between running from IDE and eclipse are probably because of different setup for parallelism or partitioning within Flink.
Upvotes: 1