Reputation: 61
I have a project in Flink which I want to optimize. I have set the the default parallelism and slots to 4 (the server has 4 cores).
taskmanager.numberOfTaskSlots = 4
parallelism.default = 4
This is my configuration to run the task, but the processing time is the same using parallelism or not. During my tests, is taking about 3 minutes to process 30MB from a Kafka queue with 5 partitions.
public void run() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fallBackRestart());
// Get the Accounts DataSource
BroadcastStream<PropertyInfo> propertyInfoBroadcastStream = getBroadcastPropertyStream(env);
// Get the DataSource
DataStream<CollectionMessage> collectionMessageDataStream = getCollectionMessageStream(env);
final var router = new KeyedProcessAccumulatorRouterImpl(config);
final Duration sessionGapDuration = config.get(SESSION_EVENT_GAP_MINUTES);
SessionKeyedProcessFunction sessionKeyedProcessFunction = new SessionKeyedProcessFunction(
router, sessionGapDuration, config);
collectionMessageDataStream
.keyBy(CollectionMessage::getSessionId)
.connect(propertyInfoBroadcastStream)
.process(sessionKeyedProcessFunction)
.uid("SessionWindow")
.name("Session Window")
.setParallelism(4);
// execute program
env.execute("Processor");
}
private DataStream<CollectionMessage> getCollectionMessageStream(
StreamExecutionEnvironment env) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", config.getString(KAFKA_CONSUMER_SERVERS));
properties.setProperty("group.id", config.getString(KAFKA_TOPROCESS_TOPIC));
properties.setProperty("max.partition.fetch.bytes",
config.getString(KAFKA_TOPROCESS_MAX_BYTES));
FlinkKafkaConsumer<RawCollection> myConsumer = new FlinkKafkaConsumer<>(
config.getString(KAFKA_TOPROCESS_TOPIC), new KafkaMessageDeserializer(), properties);
// Take lines from file from files
DataStream<RawCollection> inputMessageStream =
env.addSource(myConsumer).setParallelism(4);
B64PayloadDeserializer b64PayloadDeserializer =
new B64PayloadDeserializer(new BaseCollectionMessageDeserializer());
// Map lines to messages
DataStream<CollectionMessage> collectionMessageDataStream =
inputMessageStream
.map(b64PayloadDeserializer::deserialize).setParallelism(4)
.uid("CollectionMessageFilter")
.name("Filter Collection Messages").setParallelism(4);
// Assign new watermark on messages based on event time
return collectionMessageDataStream;
}
Looking the Flink dashboard I see 4 slots and each of the 4 subtasks busy to near 100%. Executing it locally and stoping in the class SessionKeyedProcessFunction I see 4 tasks parallelized. What can be happening to not optimize performance?
Upvotes: 0
Views: 355
Reputation: 3874
Generally, this is a little bit trickier than just increasing parallelism and expecting a significant speedup. Several things to check that may be causing less than expected performance increase with parallelism:
sessionId
distributed? Perhaps there is a skew there, which causes one operator instance to do most of the work.Upvotes: 1