cynical biscuit
cynical biscuit

Reputation: 353

Direct Kafka Stream with PySpark (Apache Spark 1.6)

I'm trying to leverage the direct kafka consumer (new feature available in python), to capture data from a custom Kafka Producer that I'm running on localhost:9092.

I'm currently using the "direct_kafka_wordcount.py" as provided by the spark 1.6 example scripts.

Source: https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py

DOCS: http://spark.apache.org/docs/latest/streaming-kafka-integration.html

I'm using the following command the run the program:

    ~/spark-1.6.0/bin/spark-submit --jars 
    ~/spark-1.6.0/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.6.0.jar           
direct_kafka_wordcount.py localhost:9092 twitter.live

Unfortunately, I'm getting a strange error, which I'm not able to debug. Any tips/suggestions will be immensely appreciated.

py4j.protocol.Py4JJavaError: An error occurred while calling o24.createDirectStreamWithoutMessageHandler.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:720)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)

Upvotes: 5

Views: 7180

Answers (3)

chandank
chandank

Reputation: 1011

I had similar problem. But turn out to be different solution. I had different versions of scala running for spark and kafka.

I ended up using same version on both side then pyspark was able to generate the classes.

I used following

Spark: spark-1.6.3-bin-hadoop2.6.tgz spark-streaming-kafka: spark-streaming-kafka-assembly_2.10-1.6.3.jar

Upvotes: 1

Mohitt
Mohitt

Reputation: 2977

The error:

java.nio.channels.ClosedChannelException

means the topic does not exist, or the brokers are not reachable or there is some network(proxy) kind of issue.

Make sure there is no such connectivity issue by running kafka-console-consumer on the spark master & worker nodes.

Upvotes: 6

Matiji66
Matiji66

Reputation: 737

In my case in spark streaming which consumer topic from kafka:

Get error as this and programs exit. So I check metadata.broker.list, only one of the brokers was added. Adding all the brokers other then one of them. And every goes well,but still get warn org.apache.spark.SparkException: java.nio.channels.ClosedChannelException ,so I check kafka brokers status from zk and find the one broker was brokedown which casue such error.

Upvotes: 0

Related Questions