evirac
evirac

Reputation: 63

kafkaUtils.createDirectStream gives an error

I changed a line from createStream to createDirectStream since the new library does not support createStream

I have checked it from here https://codewithgowtham.blogspot.com/2022/02/spark-streaming-kafka-cassandra-end-to.html

scala> val lines = KafkaUtils.createDirectStream(ssc, "localhost:2181", "sparkgroup", topicpMap).map(_._2)
    <console>:44: error: overloaded method value createDirectStream with alternatives:
      [K, V](jssc: org.apache.spark.streaming.api.java.JavaStreamingContext, locationStrategy: org.apache.spark.streaming.kafka010.LocationStrategy, consumerStrategy: org.apache.spark.streaming.kafka010.ConsumerStrategy[K,V], perPartitionConfig: org.apache.spark.streaming.kafka010.PerPartitionConfig)org.apache.spark.streaming.api.java.JavaInputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[K,V]] <and>
      [K, V](ssc: org.apache.spark.streaming.StreamingContext, locationStrategy: org.apache.spark.streaming.kafka010.LocationStrategy, consumerStrategy: org.apache.spark.streaming.kafka010.ConsumerStrategy[K,V], perPartitionConfig: org.apache.spark.streaming.kafka010.PerPartitionConfig)org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[K,V]]
     cannot be applied to (org.apache.spark.streaming.StreamingContext, String, String, scala.collection.immutable.Map[String,Int])
           val lines = KafkaUtils.createDirectStream(ssc, "localhost:2181", "sparkgroup", topicpMap).map(_._2)

Upvotes: 1

Views: 301

Answers (1)

Alex Ott
Alex Ott

Reputation: 87349

It's already 2022nd - there should be a very specific reason for using legacy Spark Streaming. Instead you need to use Spark Structured Streaming that is much more easier to use than legacy one. With it, work with Kafka is very simple:

// create stream
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
// Decode payload - it heavily depends on the data format in the Kafka
val decoded = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

You can use the same APIs for working with both streaming & batch data.

Upvotes: 1

Related Questions