sergiopf
sergiopf

Reputation: 61

Flink and Kafka parallelism optimization

I have a project in Flink which I want to optimize. I have set the the default parallelism and slots to 4 (the server has 4 cores).

taskmanager.numberOfTaskSlots = 4
parallelism.default = 4

This is my configuration to run the task, but the processing time is the same using parallelism or not. During my tests, is taking about 3 minutes to process 30MB from a Kafka queue with 5 partitions.

public void run() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRestartStrategy(RestartStrategies.fallBackRestart());
    // Get the Accounts DataSource
    BroadcastStream<PropertyInfo> propertyInfoBroadcastStream = getBroadcastPropertyStream(env);

    // Get the DataSource
    DataStream<CollectionMessage> collectionMessageDataStream = getCollectionMessageStream(env);
    final var router = new KeyedProcessAccumulatorRouterImpl(config);
    final Duration sessionGapDuration = config.get(SESSION_EVENT_GAP_MINUTES);
    SessionKeyedProcessFunction sessionKeyedProcessFunction = new SessionKeyedProcessFunction(
            router, sessionGapDuration, config);

        collectionMessageDataStream
          .keyBy(CollectionMessage::getSessionId)
          .connect(propertyInfoBroadcastStream)
          .process(sessionKeyedProcessFunction)
          .uid("SessionWindow")
          .name("Session Window")
          .setParallelism(4);

    // execute program
    env.execute("Processor");
}

private DataStream<CollectionMessage> getCollectionMessageStream(
        StreamExecutionEnvironment env) {
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", config.getString(KAFKA_CONSUMER_SERVERS));
    properties.setProperty("group.id", config.getString(KAFKA_TOPROCESS_TOPIC));
    properties.setProperty("max.partition.fetch.bytes",
        config.getString(KAFKA_TOPROCESS_MAX_BYTES));
    FlinkKafkaConsumer<RawCollection> myConsumer = new FlinkKafkaConsumer<>(
        config.getString(KAFKA_TOPROCESS_TOPIC), new KafkaMessageDeserializer(), properties);

    // Take lines from file from files
    DataStream<RawCollection> inputMessageStream =
        env.addSource(myConsumer).setParallelism(4);

    B64PayloadDeserializer b64PayloadDeserializer =
        new B64PayloadDeserializer(new BaseCollectionMessageDeserializer());

    // Map lines to messages
    DataStream<CollectionMessage> collectionMessageDataStream =
        inputMessageStream
            .map(b64PayloadDeserializer::deserialize).setParallelism(4)
            .uid("CollectionMessageFilter")
            .name("Filter Collection Messages").setParallelism(4);

    // Assign new watermark on messages based on event time
    return collectionMessageDataStream;
}

Looking the Flink dashboard I see 4 slots and each of the 4 subtasks busy to near 100%. Executing it locally and stoping in the class SessionKeyedProcessFunction I see 4 tasks parallelized. What can be happening to not optimize performance?

enter image description here

Upvotes: 0

Views: 355

Answers (1)

Dominik Wosiński
Dominik Wosiński

Reputation: 3874

Generally, this is a little bit trickier than just increasing parallelism and expecting a significant speedup. Several things to check that may be causing less than expected performance increase with parallelism:

  1. How are messages partitioned on Kafka? You have 5 partitions, but how many messages are actually in those partitions? Maybe one partition is super hot and causes all the work to be done by one operator instance.
  2. How is sessionId distributed? Perhaps there is a skew there, which causes one operator instance to do most of the work.
  3. How do You measure these 3 minutes? What part of those 3 minutes is actually work being done and not initialization, waiting and cleanup? Maybe the sample You have for tests is simply to small to actually show speedup, because most of the time is related to non-parallelizable work.

Upvotes: 1

Related Questions