senthil kumar p
senthil kumar p

Reputation: 546

Reading Multiple kafka topics through spark structure is not working?

I am trying to read data from multiple kafka topic by using structure streaming. Version kafka - 2.12-1.0.0 Spark Structure Streaming - 2.2.1

My code :-

val spark = SparkSession
      .builder()
      .appName("StreamLocallyExample")
      .config("spark.master", "local")
       .config("spark.sql.streaming.checkpointLocation", "path/XXYY")
       .getOrCreate()

    val kafkaStreamDF = sparkSession.readStream.format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("subscribe", "test,trial")
          .load()

    val df=kafkaStreamDF.selectExpr("CAST(value AS string)")

    val query=df.writeStream .outputMode("append")
                .format("console")
                .option("truncate","false")
                .start()

    query.awaitTermination()

In the above, i have two kafka topics(test,trial).When i run this program.I can consume the message from trial topic only .Not able to consume message from test topic.

I am not getting any error,the program is running fine . can anyone please help me .

Thanks!

Upvotes: 2

Views: 3463

Answers (1)

wandermonk
wandermonk

Reputation: 7346

Like Maverik mentioned please make sure all the topics are receiving data. In single stream with multiple topics, the topics are read one by one in sequence.

You may use the below configuration to enable parallelism on the consumer side

conf.set("spark.streaming.concurrentJobs", "2");

By default the number of concurrent jobs is 1 which means at a time only 1 job will be active and till its not finished,other jobs will be queued up even if the resources are available and idle.

http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming .

Running concurrent jobs reduces the processing time and scheduling delay even if a batch takes processing time slightly more than batch interval.

SparkConf conf = new SparkConf().setAppName("Streaming");
conf.set("spark.streaming.concurrentJobs", "2");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaSparkContext sc = new JavaStreamingContext(sc, new Duration(1000));

kafkaParams.put("metadata.broker.list",
                "broker1:9092,broker2:9092,broker3:9092");
Set<String> topics = Collections.singleton("");

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
                String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);

Source : http://why-not-learn-something.blogspot.in/2016/06/spark-streaming-performance-tuning-on.html

Upvotes: 1

Related Questions