Parse JSON Data and save to MongoDB in PySpark

I Am using PySpark and fetching data from Kafka Broker.

The below code helps me to do that :

import json
import sys
from pyspark import Row
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pymongo import MongoClient

# Spark Context created here
sc = SparkContext("local[*]", appName="TwitterStreamKafka")

#Spark Streaming Context Created Here
ssc = StreamingContext(sc, 1)

#Kafka Stream Created Here
tweets = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {"twitter": 1})



Extracted the JSON from tweets using below code :

tweet_json = x: json.loads(x[1]))

Now I want to insert this tweet_json to MongoDB. But couldn't get how to do that.

Checked Spark-MongoConnector it says it requires DataFrame to store in MongoDB

But Type of tweet_json is "pyspark.streaming.dstream.TransformedDStream"

How can I convert this into Dataframe to store into MongoDB?


How to save the fetched tweet to MongoDB in using PySpark

Thank you in advance!

After edits suggested in program, i edited it in below way:

import json
import sys
from pyspark import Row
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pymongo import MongoClient
def convert(rdd):
    df_json = x: json.loads(x[1])).toDF()
    return df_json
# Spark Context created here
sc = SparkContext("local[*]", appName="TwitterStreamKafka")
#Spark Streaming Context Created Here
ssc = StreamingContext(sc, 1)
#Kafka Stream Created Here
tweets = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {"twitter": 1})
#Tweets Printed Here
tweets.foreachRDD(lambda rdd: convert(rdd).write\

Then initialized spark submit in below way:

spark-submit --conf "spark.mongodb.inuri=mongodb://" --conf "spark.mongodb.output.uri=mongodb://" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 --jars spark-streaming-kafka-0-8-assembly_2.11-2.0.0.jar

It raised the following error:

File "/home/hduser/test/", line 24, in convert df_json = x: json.loads(x[1])).toDF() AttributeError: 'PipelinedRDD' object has no attribute 'toDF'

Data Format of CreateStream:

(None, '{"lang": "en", "id": 967004613332303873, "favorited": false, "possibly_sensitive": false, "is_quote_status": false, "geo": null, "user": {"lang": "en", "profile_use_background_image": false, "profile_banner_url": "", "is_translator": false, "id": 19908749, "profile_sidebar_border_color": "FFFFFF", "favourites_count": 1912, "profile_background_tile": false, "profile_image_url_https": "", "friends_count": 1960, "profile_background_image_url": "", "has_extended_profile": false, "profile_link_color": "AB0D0D", "screen_name": "ZNConsulting", "geo_enabled": true, "url": "", "profile_text_color": "000000", "default_profile": false, "utc_offset": 3600, "is_translation_enabled": false, "statuses_count": 5833, "profile_background_image_url_https": "", "verified": false, "name": "ZN Consulting", "notifications": false, "protected": false, "id_str": "19908749", "translator_type": "none", "profile_image_url": "", "created_at": "Mon Feb 02 14:28:00 +0000 2009", "time_zone": "Brussels", "listed_count": 253, "follow_request_sent": false, "location": "Brussels; Our website:", "profile_background_color": "F7F7F7", "contributors_enabled": false, "entities": {"url": {"urls": [{"display_url": "", "expanded_url": "", "url": "", "indices": [0, 23]}]}, "description": {"urls": []}}, "default_profile_image": false, "following": false, "followers_count": 2312, "description": "The #digital communication agency in #Brussels. Strategy, digital campaigns, analysis & #socialmedia. #Hyperthinking to give you the #DigitalAdvantage \\ud83d\\ude0e", "profile_sidebar_fill_color": "C7C7C7"}, "in_reply_to_user_id_str": null, "contributors": null, "retweet_count": 0, "text": "Following user feedback, #Google is now blocking intrusive ads with a built-in adblocker in #Chrome \\u26d4\\n\\n", "retweeted": false, "truncated": false, "in_reply_to_user_id": null, "id_str": "967004613332303873", "source": "<a href=\\"\\" rel=\\"nofollow\\">TweetDeck</a>", "created_at": "Fri Feb 23 11:54:00 +0000 2018", "metadata": {"iso_language_code": "en", "result_type": "recent"}, "in_reply_to_screen_name": null, "in_reply_to_status_id_str": null, "entities": {"symbols": [], "urls": [{"display_url": "\\u2026", "expanded_url": "", "url": "", "indices": [103, 126]}], "user_mentions": [], "hashtags": [{"text": "Google", "indices": [25, 32]}, {"text": "Chrome", "indices": [92, 99]}]}, "coordinates": null, "in_reply_to_status_id": null, "place": null, "favorite_count": 0}')

mayank agrawal
You will have to convert the dstream of rdds to a dstream of dataframes. Use .foreachRDD for such cases.

from pyspark.sql import SQLContext

sql = SQLContext(sc)

def _construct_key(previous_key, separator, new_key):
    if previous_key:
        return "{}{}{}".format(previous_key, separator, new_key)
        return new_key

def _flatten_JSON(nested_dict, separator='_', root_keys_to_ignore=set()):
    assert isinstance(nested_dict, dict)
    assert isinstance(separator, str)
    flattened_dict = dict()

    def _flatten(object_, key):        
        if isinstance(object_, dict):
            for object_key in object_:
                if not (not key and object_key in root_keys_to_ignore):
                    _flatten(object_[object_key], _construct_key(key, separator, object_key))
        elif isinstance(object_, list) or isinstance(object_, set):
            for index, item in enumerate(object_):
                _flatten(item, _construct_key(key, separator, index))
            flattened_dict[key] = object_

    _flatten(nested_dict, None)
    return flattened_dict

def convert(rdd):
    df_json = x: _flatten_JSON(json.loads(x[1]))).toDF()
    return df_json

def write_mongo(rdd):

tweets.foreachRDD(lambda rdd: write_mongo(rdd))

Also, you need to give conf and packages alongwith spark-submit according to your version,

/bin/spark-submit --conf "spark.mongodb.inuri=mongodb://"
                  --conf "spark.mongodb.output.uri=mongodb://" 
                  --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0

Here is format of createStream fetched data

(None, '{"lang": "en", "id": 967004613332303873, "favorited": false, "possibly_sensitive": false, "is_quote_status": false, "geo": null, "user": {"lang": "en", "profile_use_background_image": false, "profile_banner_url": "", "is_translator": false, "id": 19908749, "profile_sidebar_border_color": "FFFFFF", "favourites_count": 1912, "profile_background_tile": false, "profile_image_url_https": "", "friends_count": 1960, "profile_background_image_url": "", "has_extended_profile": false, "profile_link_color": "AB0D0D", "screen_name": "ZNConsulting", "geo_enabled": true, "url": "", "profile_text_color": "000000", "default_profile": false, "utc_offset": 3600, "is_translation_enabled": false, "statuses_count": 5833, "profile_background_image_url_https": "", "verified": false, "name": "ZN Consulting", "notifications": false, "protected": false, "id_str": "19908749", "translator_type": "none", "profile_image_url": "", "created_at": "Mon Feb 02 14:28:00 +0000 2009", "time_zone": "Brussels", "listed_count": 253, "follow_request_sent": false, "location": "Brussels; Our website:", "profile_background_color": "F7F7F7", "contributors_enabled": false, "entities": {"url": {"urls": [{"display_url": "", "expanded_url": "", "url": "", "indices": [0, 23]}]}, "description": {"urls": []}}, "default_profile_image": false, "following": false, "followers_count": 2312, "description": "The #digital communication agency in #Brussels. Strategy, digital campaigns, analysis & #socialmedia. #Hyperthinking to give you the #DigitalAdvantage \\ud83d\\ude0e", "profile_sidebar_fill_color": "C7C7C7"}, "in_reply_to_user_id_str": null, "contributors": null, "retweet_count": 0, "text": "Following user feedback, #Google is now blocking intrusive ads with a built-in adblocker in #Chrome \\u26d4\\n\\n", "retweeted": false, "truncated": false, "in_reply_to_user_id": null, "id_str": "967004613332303873", "source": "<a href=\\"\\" rel=\\"nofollow\\">TweetDeck</a>", "created_at": "Fri Feb 23 11:54:00 +0000 2018", "metadata": {"iso_language_code": "en", "result_type": "recent"}, "in_reply_to_screen_name": null, "in_reply_to_status_id_str": null, "entities": {"symbols": [], "urls": [{"display_url": "\\u2026", "expanded_url": "", "url": "", "indices": [103, 126]}], "user_mentions": [], "hashtags": [{"text": "Google", "indices": [25, 32]}, {"text": "Chrome", "indices": [92, 99]}]}, "coordinates": null, "in_reply_to_status_id": null, "place": null, "favorite_count": 0}')

