K P
K P

Reputation: 851

Spark Streaming Kafka job stuck in 'processing' stage

I have a streaming job that reads from Kafka (@1min batch) and after some operations POSTs it to a HTTP endpoint. Every few hours it's getting stuck in 'processing' stage and starts queueing jobs thereafter:

After examining the running 'Executors' (in app-UI page) I found that only 1 out of 6 executors was showing 2 'Active Tasks'.

enter image description here

Upon doing thread dump for that, it showed 2 threads for "Executor task launch worker" threadpool (source). These threads were all stuck at the same error:

enter image description here

Full readable error:

java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:502)
java.net.InetAddress.checkLookupTable(InetAddress.java:1393)
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1310)
java.net.InetAddress.getAllByName0(InetAddress.java:1276)
java.net.InetAddress.getAllByName(InetAddress.java:1192)
java.net.InetAddress.getAllByName(InetAddress.java:1126)
java.net.InetAddress.getByName(InetAddress.java:1076)
java.net.InetSocketAddress.<init>(InetSocketAddress.java:220)
kafka.network.BlockingChannel.liftedTree1$1(BlockingChannel.scala:59)
kafka.network.BlockingChannel.connect(BlockingChannel.scala:49)
kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:151)
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:69)
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:209)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:85)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

This seems to be a JDK bug that must have been fixed in JDK 7 - I made sure that I am using '1.8.0_101 (Oracle Corporation)'. I tried adding following on the command line (as suggested here), but it didn't fix the issue:

-Djava.net.preferIPv4Stack=true -Dnetworkaddress.cache.ttl=60

Does anyone have any ideas on an approach to debug/fix this?

*Edit: Renaming the question to remove confusing JDK reason

Upvotes: 0

Views: 2269

Answers (1)

K P
K P

Reputation: 851

It turned out to be a kernel-level bug https://bugzilla.redhat.com/show_bug.cgi?id=1209433 which is resolved in linux kernel version 4.0.6 and the hosts where my workers are running they have RHEL with kernel version 3.5.6. Hopefully after deploying on newer CentOS machines with kernel version 4.5 it won't be an issue.

How I figured it out is every time it gets stuck at 'checkLookupTable' or 'lookupAllHostAddr', both are native (JNI) calls to underlying OS.

Upvotes: 2

Related Questions