smeeb
smeeb

Reputation: 29567

Serial consumption of Kafka topics from Spark

Given the following code:

def createKafkaStream(ssc: StreamingContext, 
                      kafkaTopics: String, brokers: String): DStream[(String, String)] = {
    // some configs here
    KafkaUtils.createDirectStream[String, String, StringDecoder,
        StringDecoder](ssc, props, topicsSet)
}

def consumerHandler(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(10))

    createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
        rdd.foreach { msg =>
            // Now do some DataFrame-intensive work.
            // As I understand things, DataFrame ops must be run
            // on Workers as well as streaming consumers.
        }
    })

    ssc
}

StreamingContext.getActive.foreach {
    _.stop(stopSparkContext = false)
}

val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()

My understanding is that Spark and Kafka will automagically work together to figure out how many consumer threads to deploy to available Worker Nodes, which likely results in parallel processing of messages off a Kafka topic.

But what if I don't want multiple, parallel consumers? What if want 1-and-only-1 consumer reading the next message from a topic, processing it completely, and then starting back over again and polling for the next message.

Also, when I call:

val ssc = new StreamingContext(sc, Seconds(10))

Does this mean:

Upvotes: 1

Views: 1012

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149618

But what if I don't want multiple, parallel consumers? What if want 1-and-only-1 consumer reading the next message from a topic, processing it completely, and then starting back over again and polling for the next message.

If that is your use-case, I'd say why use Spark at all? Its entire advantage is that you can read in parallel. The only hacky workaround I can think of is creating a Kafka topic with a single partition, which would make Spark assign the entire offset range to a single worker, but that is ugly.

Does that mean that a single consumer thread will receive all messages that were published to the topic in the last 10 seconds or that a single consumer thread will receive the next (single) message from the topic, and that it will poll for the next message every 10 seconds?

Neither. Since you're using direct (receiverless) stream approach, it means that every 10 seconds, your driver will ask Kafka to give him the offset ranges that have changed since the last batch, for each partition of the said topic. Then, Spark will take each such offset range, and send it to one of the workers to consume directly from Kafka. This means that with the direct stream approach, there is a 1:1 correspondence between Kafka partitions and Spark partitions.

Upvotes: 2

Related Questions