faustineinsun
faustineinsun

Reputation: 451

Consume Kafka in Spark Streaming (Spark 2.0)

I found there're two methods to consume Kafka topic in Spark Streaming (Spark 2.0):

1) using KafkaUtils.createDirectStream to get DStream every k seconds, please refer to this document

2) using kafka: sqlContext.read.format(“json”).stream(“kafka://KAFKA_HOST”) to create an infinite DataFrame for Spark 2.0's new feature: Structured Streaming, related doc is here

Method 1) works, but 2) doesn't, I got the following error

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.DataFrameReader.stream(Ljava/lang/String;)Lorg/apache/spark/sql/Dataset;
...
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

My questions are:
What's “kafka://KAFKA_HOST” referring to?
How should I fix this problem?

Thank you in advance!

Upvotes: 3

Views: 1358

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149518

Spark 2.0 doesn't yet support Kafka as a source of infinite DataFrames/Sets. Support is planned to be added in 2.1

Edit: (6.12.2016)

Kafka 0.10 is now expiramentaly supported in Spark 2.0.2:

val ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

ds1
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Upvotes: 3

Related Questions