Jorge Cespedes
Jorge Cespedes

Reputation: 597

Spark Streaming 1.6 + Kafka: Too many batches in "queued" status

I'm using spark streaming to consume messages from a Kafka topic, which has 10 partitions. I'm using direct approach to consume from kafka and the code can be found below:

def createStreamingContext(conf: Conf): StreamingContext = {
    val dateFormat = conf.dateFormat.apply
    val hiveTable = conf.tableName.apply

    val sparkConf = new SparkConf()

    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    sparkConf.set("spark.driver.allowMultipleContexts", "true")

    val sc = SparkContextBuilder.build(Some(sparkConf))
    val ssc = new StreamingContext(sc, Seconds(conf.batchInterval.apply))

    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> conf.kafkaBrokers.apply,
      "key.deserializer" -> classOf[StringDeserializer].getName,
      "value.deserializer" -> classOf[StringDeserializer].getName,
      "auto.offset.reset" -> "smallest",
      "enable.auto.commit" -> "false"
    )

    val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc,
      kafkaParams,
      conf.topics.apply().split(",").toSet[String]
    )

    val windowedKafkaStream = directKafkaStream.window(Seconds(conf.windowDuration.apply))
    ssc.checkpoint(conf.sparkCheckpointDir.apply)

    val eirRDD: DStream[Row] = windowedKafkaStream.map { kv =>
      val fields: Array[String] = kv._2.split(",")
      createDomainObject(fields, dateFormat)
    }

    eirRDD.foreachRDD { rdd =>
      val schema = SchemaBuilder.build()
      val sqlContext: HiveContext = HiveSQLContext.getInstance(Some(rdd.context))
      val eirDF: DataFrame = sqlContext.createDataFrame(rdd, schema)

      eirDF
        .select(schema.map(c => col(c.name)): _*)
        .write
        .mode(SaveMode.Append)
        .partitionBy("year", "month", "day")
        .insertInto(hiveTable)
    }
    ssc
  }

As it can be seen from the code, I used window to achieve this (and please correct me if I'm wrong): Since there's an action to insert into a hive table, I want to avoid writing to HDFS too often, so what I want is to hold enough data in memory and only then write to the filesystem. I thought that using window would be the right way to achieve it.

Now, in the image below, you can see that there are many batches being queued and the batch being processed, takes forever to complete.

Only one batch being in processing status and others queued forever

I'm also providing the details of the single batch being processed:

Insert into generates thousands of tasks!! Why

Why are there so many tasks for the insert action, when there aren't many events in the batch? Sometimes having 0 events also generates thousands of tasks that take forever to complete.

Is the way I process microbatches with Spark wrong?

Thanks for your help!

Some extra details:

Yarn containers have a max of 2gb. In this Yarn queue, the maximum number of containers is 10. When I look at details of the queue where this spark application is being executed, the number of containers is extremely large, around 15k pending containers.

Upvotes: 1

Views: 1162

Answers (1)

Jorge Cespedes
Jorge Cespedes

Reputation: 597

Well, I finally figured it out. Apparently Spark Streaming does not get along with empty events, so inside the foreachRDD portion of the code, I added the following:

eirRDD.foreachRDD { rdd =>
      if (rdd.take(1).length != 0) {
        //do action
      }
}

That way we skip empty micro-batches. the isEmpty() method does not work.

Hope this help somebody else! ;)

Upvotes: 2

Related Questions