analyticalpicasso
analyticalpicasso

Reputation: 1993

pyspark streaming twitter json to DF

I am working on Integration spark-streaming with twitter using python API. Mostly examples or code-snippets and blogs I have seen is that they are taking few columns from Twitter JSON file for their final manipulation. But according to my use-case I required all fields of twitter JSON and convert it to a dataframe. This is where I am facing the problem as I sqlContext.read.json() is dumping whole JSON DStream into _corrupt_record

+--------------------+
|     _corrupt_record|
+--------------------+
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|
|{u'quote_count': ...|

Also, It seems that this problem can be solved using structured streaming using spark 2+ version. But I have to stick to spark 1.6. Following is my code snippet.

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:

        sqlContext = getSqlContextInstance(rdd.context)

        jsonRDD = sqlContext.read.json(rdd)
        jsonRDD.registerTempTable("tweets")
        jsonRDD.printSchema()
    except:
        pass


rawKafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "kafka-consumer", {kafkaTopic: 4})
parsed_stream = rawKafkaStream.map(lambda rawTweet: json.loads(rawTweet[1]))

parsed_stream.foreachRDD(process)

Upvotes: 1

Views: 1698

Answers (2)

analyticalpicasso
analyticalpicasso

Reputation: 1993

Python json.dumps() creates dictionary RDD of type RDD[Dict] in Spark. To make it into DF the following line will work

SQLContext.jsonRDD(RDD[dict].map(lambda x: json.dumps(x)))

To make it to work in my case I had to do the following

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:

        sqlContext = getSqlContextInstance(rdd.context)

        jsonRDD=sqlContext.jsonRDD(rdd.map(lambda x: json.dumps(x)))
        jsonRDD.registerTempTable("tweets")
        jsonRDD.printSchema()
    except:
        pass


rawKafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "kafka-consumer", {kafkaTopic: 4})
parsed_stream = rawKafkaStream.map(lambda rawTweet: json.loads(rawTweet[1]))

parsed_stream.foreachRDD(process)

For more detail on this approach. Refer to the link

Upvotes: 3

Alper t. Turker
Alper t. Turker

Reputation: 35249

Either skip parsing in Python:

rawKafkaStream.map(lambda rawTweet: rawTweet[1]).foreachRDD(process)

or don't use JSON reader:

def process(time, rdd):
    try:
        sqlContext = getSqlContextInstance(rdd.context)
        # May or may not work correctly without schema (see below)
        jsonRDD = sqlContext.createDataFrame(rdd)
        jsonRDD.registerTempTable("tweets")
        jsonRDD.printSchema()
    except:
        pass

In general with both methods you should provide schema:

from pyspark.sql.types import *

schema = StructType([...])
jsonRDD = sqlContext.read.schema(schema).json(rdd)

and

jsonRDD = sqlContext.createDataFrame(rdd, schema)

respectively.

Upvotes: 2

Related Questions