Reputation: 549
I am using below code to read from Kafka topic , and process the data.
JavaDStream<Row> transformedMessages = messages.flatMap(record -> processData(record))
.transform(new Function<JavaRDD<Row>, JavaRDD<Row>>() {
//JavaRDD<Row> records = ss.emptyDataFrame().toJavaRDD();
StructType schema = DataTypes.createStructType(fields);
public JavaRDD<Row> call(JavaRDD<Row> rdd) throws Exception {
records = rdd.union(records);
return rdd;
}
});
transformedMessages.foreachRDD(record -> {
//System.out.println("Aman" +record.count());
StructType schema = DataTypes.createStructType(fields);
Dataset ds = ss.createDataFrame(records, schema);
ds.createOrReplaceTempView("trades");
System.out.println(ds.count());
ds.show();
});
While running the code, i am getting below exception :
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1197)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
The fact that i only have one DStream, i am not sure why i am getting this exception. I am reading from 3 partitions in a Kafka topic. I assume that the "createDirectStream" will be creating 3 consumers to read the data.
Below is the code for for KafkaConsumer, acquire method:
private void acquire() {
this.ensureNotClosed();
long threadId = Thread.currentThread().getId();
if(threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
} else {
this.refcount.incrementAndGet();
}
}
Upvotes: 11
Views: 15870
Reputation: 1253
In this piece of code, you perform two actions on RDD
transformedMessages.foreachRDD(record -> {
//System.out.println("Aman" +record.count());
StructType schema =
DataTypes.createStructType(fields);
Dataset ds = ss.createDataFrame(records, schema);
ds.createOrReplaceTempView("trades");
System.out.println(ds.count());
ds.show();
});
Two consumers from Consumer Group tried to read the Kafka topic partition, but Kafka allows only one consumer from one consumer group can read the Kafka topic partition. The solution for this issue is: cache the RDD
transformedMessages.foreachRDD(record -> {
//System.out.println("Aman" +record.count());
StructType schema =
DataTypes.createStructType(fields);
Dataset ds = ss.createDataFrame(records, schema);
ds.cache()
System.out.println(ds.count());
ds.show();
});
Upvotes: 0
Reputation: 377
This is a similar problem of java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access , you have more than one thread running with the same consumer and Kafka does not support multithreading. Also make sure you are not using spark.speculation=true as it will cause the error mentioned above.
Upvotes: 1
Reputation: 2901
Spark 2.2.0 has a workaround using no cache.
Just use spark.streaming.kafka.consumer.cache.enabled to false
.
Take a look on this pull request
Upvotes: 9
Reputation: 26
As described in this bug report: https://issues.apache.org/jira/browse/SPARK-19185, it's a known issue with Spark/Kafka.
In my case, I am going to avoid using window, and use partitioning in combination with batchInterval and blockInterval, as described here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
Upvotes: 0