艾瑪艾瑪艾瑪
艾瑪艾瑪艾瑪

Reputation: 310

How to direct stream(kafka) a JSON file in spark and convert it into RDD?

Wrote a code that direct streams(kafka) word count when file is given(in producer)

code :

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

Need to convert the input json file to spark Dataframe using Dstream.

Upvotes: 0

Views: 910

Answers (1)

Isaac Amezcua
Isaac Amezcua

Reputation: 128

This should work:

Once you have your variable containing the TransformedDStream kvs, you can just create a map of DStreams and pass the data to a handler function like this:

data = kvs.map( lambda tuple: tuple[1] )
data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )

You should define the handler function that should create the dataframe using your JSON data:

def readMyRddsFromKafkaStream( readRdd ):
  # Put RDD into a Dataframe
  df = spark.read.json( readRdd )
  df.registerTempTable( "temporary_table" )
  df = spark.sql( """
    SELECT
      *
    FROM
      temporary_table
  """ )
  df.show()

Hope it helps my friends :)

Upvotes: 1

Related Questions