Prithvi Singh
Prithvi Singh

Reputation: 97

Way to use previous data with current data in pyspark with kafka stream

I am sending dict objects from my producer and using pyspark to create a new object. But the kind of obj I want to form requires the key, value pair of previous data also. I tried window batching and reduceByKey but none of them seems to work.

Suppose my producer object is like a list of "url_id" and "url" pair. For ex.{"url_id": "google.com"} and in spark i want to form an object like: {"data": {"url_id": "url", "url_id_of_previous_url": "url",....and so on}

My spark code is:

conf = SparkConf().setAppName(appName).setMaster("local[*]")
        sc = SparkContext(conf=conf)

        stream_context = StreamingContext(sparkContext=sc, batchDuration=batchTime)
        kafka_stream = KafkaUtils.createDirectStream(ssc=stream_context, topics=[topic], 
                                          kafkaParams={"metadata.broker.list":"localhost:9092", 
                                                     'auto.offset.reset':'smallest'})
        lines = kafka_stream.map(lambda x: json.loads(x[1]))

I am stuck after this. Can u tell me if forming such obj is possible or not with spark? And if it is then what should I use?

Upvotes: 0

Views: 382

Answers (1)

suresiva
suresiva

Reputation: 3173

As far as I know, you can solve this in 2 ways,

First approach will be simple to let the message producing application itself to send the pair of messages (current & previous) by enabling some internal caching into it.

Second approach is to use Spark Stateful Streaming to maintain the last message's values in Spark state context. As you are using PySpark, the only option which I know is to use the updateStateByKey with check point enabled.

The typical flow will be as follows with PySpark Streaming,

  • To define initial value and update function
  • maintain a common key to match the current and previous messages, I used pair_msgs in this example.

    # RDD with initial state (key, value) pairs
    initialStateRDD = sc.parallelize([(u'pair_msgs', '{"url_id":"none"}')])
    
    def updateFunc(new_url_msg, last_url_msg):
        if not new_url_msg:
            return last_url_msg
        else:
            new_url_dict = json.loads(new_url_msg[0])
            new_url_dict['url_id_previous'] = json.loads(last_url_msg)['url_id']
            return json.dumps(new_url_msg)
    
  • To map the input messages with the common key, pair_msgs in this example.

  • invoke updateStateByKey transformation with the above update function.

    feeds = kafka_stream.map(lambda x: x[1])
    
    pair_feed = feeds.map(lambda feed_str: ('pair_msgs', feed_str)) \
                 .updateStateByKey(updateFunc, initialRDD=initialStateRDD)
    

[Note: As for as I know, PySpark Structured Streaming is yet to get Stateful Streaming support, So I believe the above example still makes sense]

Upvotes: 2

Related Questions