Reputation: 353
I'm trying to leverage the direct kafka consumer (new feature available in python), to capture data from a custom Kafka Producer that I'm running on localhost:9092.
I'm currently using the "direct_kafka_wordcount.py" as provided by the spark 1.6 example scripts.
DOCS: http://spark.apache.org/docs/latest/streaming-kafka-integration.html
I'm using the following command the run the program:
~/spark-1.6.0/bin/spark-submit --jars
~/spark-1.6.0/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.6.0.jar
direct_kafka_wordcount.py localhost:9092 twitter.live
Unfortunately, I'm getting a strange error, which I'm not able to debug. Any tips/suggestions will be immensely appreciated.
py4j.protocol.Py4JJavaError: An error occurred while calling o24.createDirectStreamWithoutMessageHandler.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:720)
at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Upvotes: 5
Views: 7180
Reputation: 1011
I had similar problem. But turn out to be different solution. I had different versions of scala running for spark and kafka.
I ended up using same version on both side then pyspark was able to generate the classes.
I used following
Spark: spark-1.6.3-bin-hadoop2.6.tgz
spark-streaming-kafka: spark-streaming-kafka-assembly_2.10-1.6.3.jar
Upvotes: 1
Reputation: 2977
The error:
java.nio.channels.ClosedChannelException
means the topic
does not exist, or the brokers are not reachable or there is some network(proxy) kind of issue.
Make sure there is no such connectivity issue by running kafka-console-consumer
on the spark master & worker nodes.
Upvotes: 6
Reputation: 737
In my case in spark streaming which consumer topic from kafka:
Get error as this and programs exit.
So I check metadata.broker.list
, only one of the brokers was added.
Adding all the brokers other then one of them.
And every goes well,but still get warn org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
,so I check kafka brokers status from zk and find the one broker was brokedown which casue such error.
Upvotes: 0