Hrishikesh Mishra
Hrishikesh Mishra

Reputation: 3433

Spark Streaming Direct Kafka Consumers are not evenly distrubuted across executors

I have created one sample Direct Kafka Stream in Spark. Kafka has 30 partitions of given topic. But all consumers are executing from same executor machine.

Kafka Manager screenshot.

enter image description here

As per my understanding in direct Kafka Stream, Driver gives the offsets to executors and polls with this.

Spark Version: 2.4

Sample code below:



import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.Arrays;
import java.util.HashMap;


public class Main {


    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("StreamingTest");

        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");


        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));



        JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1 = createKafkaStream(ssc, "test-topic-1");

        kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
            System.out.println("Processing test-topic-1");
            try {
                Thread.sleep(2);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        })));

        kafkaStream1.foreachRDD(rdd -> {
            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
            final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o -> OffsetRange.create(o.topicPartition(), 0, o.fromOffset())).toArray(OffsetRange[]::new);
            rdd.foreachPartition(partition -> {
                OffsetRange o = beginOffsets[TaskContext.get().partitionId()];

            });
            ((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(beginOffsets);
        });



        ssc.start();
        ssc.awaitTermination();
    }

    public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext ssc, String topic) {
        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-ids>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"hrishi-testing-nfr-7");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
    }
}


Upvotes: 1

Views: 501

Answers (1)

Hrishikesh Mishra
Hrishikesh Mishra

Reputation: 3433

I found the issue it is happening because I was committing offset from driver. This the code.

((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);

   kafkaStream1.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
        rdd.foreachPartition(partition -> {
            partition.forEachRemaining(e -> {
                try {
                    System.out.println("hrishi mess" + e);
                    Thread.sleep(2);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            });

        });
        ((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);
    });

Next, I enabled debug log on Executors and found that KafkaRDD was polling from Kafka it is clearly visible in the log.

Upvotes: 1

Related Questions