lu_ferra
lu_ferra

Reputation: 83

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

I have a Scala Spark Streaming application that receives data from the same topic from 3 different Kafka producers.

The Spark streaming application is on machine with host 0.0.0.179, the Kafka server is on machine with host 0.0.0.178, the Kafka producers are on machines, 0.0.0.180, 0.0.0.181, 0.0.0.182.

When I try to run the Spark Streaming application got below error

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 1 times, most recent failure: Lost task 0.0 in stage 19.0 (TID 19, localhost): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1625) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1198) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1204) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

Now I read thousand of different posts but no one seems to be able to find a solution at this issue.

How can I handle this on my application? Do I have to modify some parameters on Kakfa (at the moment the num.partition parameter is set to 1)?

Following is the code of my application :

// Create the context with a 5 second batch size
val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").set("spark.streaming.concurrentJobs", "3").setMaster("local[4]")
val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds(3))

case class Thema(name: String, metadata: String)
case class Tempo(unit: String, count: Int, metadata: String)
case class Spatio(unit: String, metadata: String)
case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema)
case class Location(latitude: Double, longitude: Double, name: String)

case class Datas1(location : Location, timestamp : String, windspeed : Double, direction: String, strenght : String)
case class Sensors1(sensor_name: String, start_date: String, end_date: String, data1: Datas1, stt: Stt)    


val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "0.0.0.178:9092",
    "key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
    "value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
    "group.id" -> "test_luca",
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics1 = Array("topics1")

  val s1 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(record => {
    implicit val formats = DefaultFormats
    parse(record.value).extract[Sensors1]
  } 
  )      
  s1.print()
  s1.saveAsTextFiles("results/", "")
ssc.start()
ssc.awaitTermination()

Thank you

Upvotes: 2

Views: 9208

Answers (2)

kartik kudada
kartik kudada

Reputation: 38

Using cache worked for me . In my case print , transformation and then print on JavaPairDstream was giving me that error . I used cache just before first print, it worked for me.

s1.print()
s1.saveAsTextFiles("results/", "")

Below code will work, i used similar code .

s1.cache();
s1.print();
s1.saveAsTextFiles("results/", "");

Upvotes: 0

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149538

Your problem is here:

s1.print()
s1.saveAsTextFiles("results/", "")

Since Spark creates a graph of flows, and you define two flows here:

Read from Kafka -> Print to console
Read from Kafka -> Save to text file

Spark will attempt to concurrently run both of these graphs, since they are independent of each other. Since Kafka uses a cached consumer approach, it is effectively trying to use the same consumer for both stream executions.

What you can do is cache the DStream before running the two queries:

val dataFromKafka = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(/* stuff */)

val cachedStream = dataFromKafka.cache()
cachedStream.print()
cachedStream.saveAsTextFiles("results/", "")

Upvotes: 5

Related Questions