pratik rudra
pratik rudra

Reputation: 41

How to consume JSON records from Kafka using Spark Streaming and Python?

I have created a Kafka topic with records in JSON format.

I am able to consume these JSON strings using kafka-console-consumer.sh:

./kafka-console-consumer.sh --new-consumer \
    --topic test \
    --from-beginning \
    --bootstrap-server host:9092 \
    --consumer.config /root/client.properties

How could I do this using Spark Streaming in Python?

Upvotes: 1

Views: 3577

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

Doh, why Python not Scala?! Then your home exercise is going to be to rewrite the below code to Python then ;-)

From Advanced Sources:

As of Spark 2.1.1, out of these sources, Kafka, Kinesis and Flume are available in the Python API.

Basically, the process is to:

Read messages from a Kafka topic using spark-streaming-kafka-0-10_2.11 library as described in Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) using KafkaUtils.createDirectStream.

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

Copy ConsumerRecords to values using map operator so you won't face serialization issues.

stream.map(record => (record.key, record.value))

If you don't send keys, just record.value is enough.

stream.map(record => record.value)

Transform string messages to JSON Once you have the values, you use from_json function:

from_json(e: Column, schema: StructType) Parses a column containing a JSON string into a StructType with the specified schema. Returns null, in the case of an unparseable string.

The code will look as follows:

...foreach { rdd =>
  messagesRDD.toDF.
    withColumn("json", from_json('value, jsonSchema)).
    select("json.*").show(false)
}

Done!

Upvotes: 3

Related Questions