SUNIL KUMAR C
SUNIL KUMAR C

Reputation: 61

Unable to create a dataframe from json dstream using pyspark

I am attempting to create a dataframe from json in dstream but the code below does not seem to get the dataframe right -

import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
    return globals()['sqlContextSingletonInstance']

if __name__ == "__main__":
    if len(sys.argv) != 3:
        raise IOError("Invalid usage; the correct format is:\nquadrant_count.py <hostname> <port>")

# Initialize a SparkContext with a name
spc = SparkContext(appName="jsonread")
sqlContext = SQLContext(spc)
# Create a StreamingContext with a batch interval of 2 seconds
stc = StreamingContext(spc, 2)
# Checkpointing feature
stc.checkpoint("checkpoint")
# Creating a DStream to connect to hostname:port (like localhost:9999)
lines = stc.socketTextStream(sys.argv[1], int(sys.argv[2]))
lines.pprint()
parsed = lines.map(lambda x: json.loads(x))
def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SQLContext
        sqlContext = getSqlContextInstance(rdd.context)
        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = sqlContext.createDataFrame(rowRdd)
        # Register as table
        wordsDataFrame.registerTempTable("mytable")
        testDataFrame = sqlContext.sql("select summary from mytable")
        print(testDataFrame.show())
        print(testDataFrame.printSchema())
    except:
        pass
parsed.foreachRDD(process)
stc.start()
# Wait for the computation to terminate
stc.awaitTermination()

No errors but when the script runs, it does read the json from streaming context successfully however it does not print the values in summary or the dataframe schema.

Example json I am attempting to read -

{"reviewerID": "A2IBPI20UZIR0U", "asin": "1384719342", "reviewerName": "cassandra tu \"Yeah, well, that's just like, u...", "helpful": [0, 0], "reviewText": "Not much to write about here, but it does exactly what it's supposed to. filters out the pop sounds. now my recordings are much more crisp. it is one of the lowest prices pop filters on amazon so might as well buy it, they honestly work the same despite their pricing,", "overall": 5.0, "summary": "good", "unixReviewTime": 1393545600, "reviewTime": "02 28, 2014"}

I am absolute new comer to spark streaming and started working on pet projects by reading documentation. Any help and guidance is greatly appreciated.

Upvotes: 5

Views: 1037

Answers (0)

Related Questions