Amanpreet Khurana
Amanpreet Khurana

Reputation: 549

Kafkaconsumer is not safe for multi-threading access

I am using below code to read from Kafka topic , and process the data.

JavaDStream<Row> transformedMessages = messages.flatMap(record -> processData(record))
                .transform(new Function<JavaRDD<Row>, JavaRDD<Row>>() {
                    //JavaRDD<Row> records = ss.emptyDataFrame().toJavaRDD();
                    StructType schema = DataTypes.createStructType(fields);

                    public JavaRDD<Row> call(JavaRDD<Row> rdd) throws Exception {
                        records = rdd.union(records);
                        return rdd;
                    }
        });

       transformedMessages.foreachRDD(record -> {
            //System.out.println("Aman" +record.count());
            StructType schema = DataTypes.createStructType(fields);

            Dataset ds = ss.createDataFrame(records, schema);
            ds.createOrReplaceTempView("trades");
            System.out.println(ds.count());
            ds.show();

        });

While running the code, i am getting below exception :

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1197)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

The fact that i only have one DStream, i am not sure why i am getting this exception. I am reading from 3 partitions in a Kafka topic. I assume that the "createDirectStream" will be creating 3 consumers to read the data.

Below is the code for for KafkaConsumer, acquire method:

 private void acquire() {
        this.ensureNotClosed();
        long threadId = Thread.currentThread().getId();
        if(threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        } else {
            this.refcount.incrementAndGet();
        }
    }

Upvotes: 11

Views: 15870

Answers (4)

Naga
Naga

Reputation: 1253

In this piece of code, you perform two actions on RDD

 transformedMessages.foreachRDD(record -> {
        //System.out.println("Aman" +record.count());
        StructType schema = 
        DataTypes.createStructType(fields);

        Dataset ds = ss.createDataFrame(records, schema);
        ds.createOrReplaceTempView("trades");

        System.out.println(ds.count());
        ds.show();

    });

Two consumers from Consumer Group tried to read the Kafka topic partition, but Kafka allows only one consumer from one consumer group can read the Kafka topic partition. The solution for this issue is: cache the RDD

 transformedMessages.foreachRDD(record -> {
        //System.out.println("Aman" +record.count());
        StructType schema = 
        DataTypes.createStructType(fields);

        Dataset ds = ss.createDataFrame(records, schema);
        ds.cache()

        System.out.println(ds.count());
        ds.show();

    });

Upvotes: 0

Al Elizalde
Al Elizalde

Reputation: 377

This is a similar problem of java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access , you have more than one thread running with the same consumer and Kafka does not support multithreading. Also make sure you are not using spark.speculation=true as it will cause the error mentioned above.

Upvotes: 1

Ehud Lev
Ehud Lev

Reputation: 2901

Spark 2.2.0 has a workaround using no cache. Just use spark.streaming.kafka.consumer.cache.enabled to false. Take a look on this pull request

Upvotes: 9

Sergey
Sergey

Reputation: 26

As described in this bug report: https://issues.apache.org/jira/browse/SPARK-19185, it's a known issue with Spark/Kafka.

In my case, I am going to avoid using window, and use partitioning in combination with batchInterval and blockInterval, as described here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

Upvotes: 0

Related Questions