Val
Val

Reputation: 355

How do I consume Kafka topic inside spark streaming app?

When I create a stream from Kafka topic and print its content

    import os
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils

    sc = SparkContext(appName="PythonStreamingKafkaWords")
    ssc = StreamingContext(sc, 10)

    lines = KafkaUtils.createDirectStream(ssc, ['sample_topic'], {"bootstrap.servers": 'localhost:9092'})

    lines.pprint()

    ssc.start()
    ssc.awaitTermination()

I get an empty result

    -------------------------------------------
    Time: 2019-12-07 13:11:50
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:00
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:10
    -------------------------------------------

Meanwhile, it works in the console:

    kafka-console-consumer --topic sample_topic --from-beginning --bootstrap-server localhost:9092

correctly gives me all lines of my text in Kafka topic:

    ham Ok lor... Sony ericsson salesman... I ask shuhui then she say quite gd 2 use so i considering...
    ham Ard 6 like dat lor.
    ham Why don't you wait 'til at least wednesday to see if you get your .
    ham Huh y lei...
    spam    REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode
    spam    This is the 2nd time we have tried 2 contact u. U have won the £750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate.
    ham Will ü b going to esplanade fr home?
    . . . 

What is the proper way to stream data from Kafka topic into Spark streaming app?

Upvotes: 1

Views: 3012

Answers (3)

Based on your code ,We can't print the streaming RDD directly and should be printing based on the foreachRDD .DStream.foreachRDD is an "output operator" in Spark Streaming. It allows you to access the underlying RDDs of the DStream to execute actions that do something practical with the data.

What's the meaning of DStream.foreachRDD function?

Note:: Still You can achieve through structured streaming as well. ref : Pyspark Structured streaming processing

Sample working code : This code trying to read the message from kafka topic and printing it. You can change this code based on your requirement.

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

def handler(message):
    records = message.collect()
    for record in records:
        print(record[1])

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    kvs = KafkaUtils.createDirectStream(ssc, ['topic_name'], {"metadata.broker.list": 'localhost:9192'},valueDecoder=serializer.decode_message)
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

Upvotes: 1

wypul
wypul

Reputation: 837

The reason that you are not seeing any data in streaming output is because spark streaming starts reading data from latest by default. So if you start your spark streaming application first and then write data to Kafka, you will see output in streaming job. Refer documentation here:

By default, it will start consuming from the latest offset of each Kafka partition

But you can also read data from any specific offset of your topic. Take a look at createDirectStream method here. It takes a dict parameter fromOffsets where you can specify the offset per partition in a dictionary.

I have tested below code with kafka 2.2.0 and spark 2.4.3 and Python 3.7.3:

Start pyspark shell with kafka dependencies:

pyspark --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.0

Run below code:

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
topicPartion = TopicAndPartition('test',0)
fromOffset = {topicPartion: 0}

lines = KafkaUtils.createDirectStream(ssc, ['test'],{"bootstrap.servers": 'localhost:9092'}, fromOffsets=fromOffset)

lines.pprint()

ssc.start()
ssc.awaitTermination()

Also you should consider using Structured Streaming instead Spark Streaming if you have kafka broker version 10 or higher. Refer Structured Streaming documentation here and Structured Streaming with Kafka integration here.

Below is a sample code to run in Structured Streaming. Please use jar version according to your Kafka version and spark version. I am using spark 2.4.3 with Scala 11 and kafka 0.10 so using jar spark-sql-kafka-0-10_2.11:2.4.3.

Start pyspark shell:

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", "earliest") \
  .load()


df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("console") \
  .start()

Upvotes: 0

c.guzel
c.guzel

Reputation: 301

I recommend to use Spark structured streaming. It's the new generation streaming engine comes with the release of Spark 2. You can check it in this link.

For Kafka integration, you can look at the docs at this link.

Upvotes: 0

Related Questions