Reputation: 1993
I am working on Integration spark-streaming
with twitter
using python
API. Mostly examples or code-snippets and blogs I have seen is that they are taking few columns from Twitter
JSON
file for their final manipulation. But according to my use-case I required all fields of twitter
JSON
and convert it to a dataframe. This is where I am facing the problem as I sqlContext.read.json()
is dumping whole JSON
DStream
into _corrupt_record
+--------------------+ | _corrupt_record| +--------------------+ |{u'quote_count': ...| |{u'quote_count': ...| |{u'quote_count': ...| |{u'quote_count': ...| |{u'quote_count': ...| |{u'quote_count': ...| |{u'quote_count': ...| |{u'quote_count': ...| |{u'quote_count': ...| |{u'quote_count': ...| |{u'quote_count': ...| |{u'quote_count': ...|
Also, It seems that this problem can be solved using structured streaming
using spark 2+ version. But I have to stick to spark 1.6
. Following is my code snippet.
def process(time, rdd):
print("========= %s =========" % str(time))
try:
sqlContext = getSqlContextInstance(rdd.context)
jsonRDD = sqlContext.read.json(rdd)
jsonRDD.registerTempTable("tweets")
jsonRDD.printSchema()
except:
pass
rawKafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "kafka-consumer", {kafkaTopic: 4})
parsed_stream = rawKafkaStream.map(lambda rawTweet: json.loads(rawTweet[1]))
parsed_stream.foreachRDD(process)
Upvotes: 1
Views: 1698
Reputation: 1993
Python json.dumps()
creates dictionary RDD of type RDD[Dict] in Spark. To make it into DF the following line will work
SQLContext.jsonRDD(RDD[dict].map(lambda x: json.dumps(x)))
To make it to work in my case I had to do the following
def process(time, rdd):
print("========= %s =========" % str(time))
try:
sqlContext = getSqlContextInstance(rdd.context)
jsonRDD=sqlContext.jsonRDD(rdd.map(lambda x: json.dumps(x)))
jsonRDD.registerTempTable("tweets")
jsonRDD.printSchema()
except:
pass
rawKafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "kafka-consumer", {kafkaTopic: 4})
parsed_stream = rawKafkaStream.map(lambda rawTweet: json.loads(rawTweet[1]))
parsed_stream.foreachRDD(process)
For more detail on this approach. Refer to the link
Upvotes: 3
Reputation: 35249
Either skip parsing in Python:
rawKafkaStream.map(lambda rawTweet: rawTweet[1]).foreachRDD(process)
or don't use JSON reader:
def process(time, rdd):
try:
sqlContext = getSqlContextInstance(rdd.context)
# May or may not work correctly without schema (see below)
jsonRDD = sqlContext.createDataFrame(rdd)
jsonRDD.registerTempTable("tweets")
jsonRDD.printSchema()
except:
pass
In general with both methods you should provide schema:
from pyspark.sql.types import *
schema = StructType([...])
jsonRDD = sqlContext.read.schema(schema).json(rdd)
and
jsonRDD = sqlContext.createDataFrame(rdd, schema)
respectively.
Upvotes: 2