Reputation: 97
I am sending dict objects from my producer and using pyspark to create a new object. But the kind of obj I want to form requires the key, value pair of previous data also. I tried window batching and reduceByKey but none of them seems to work.
Suppose my producer object is like a list of "url_id" and "url" pair. For ex.{"url_id": "google.com"} and in spark i want to form an object like: {"data": {"url_id": "url", "url_id_of_previous_url": "url",....and so on}
My spark code is:
conf = SparkConf().setAppName(appName).setMaster("local[*]")
sc = SparkContext(conf=conf)
stream_context = StreamingContext(sparkContext=sc, batchDuration=batchTime)
kafka_stream = KafkaUtils.createDirectStream(ssc=stream_context, topics=[topic],
kafkaParams={"metadata.broker.list":"localhost:9092",
'auto.offset.reset':'smallest'})
lines = kafka_stream.map(lambda x: json.loads(x[1]))
I am stuck after this. Can u tell me if forming such obj is possible or not with spark? And if it is then what should I use?
Upvotes: 0
Views: 382
Reputation: 3173
As far as I know, you can solve this in 2 ways,
First approach will be simple to let the message producing application itself to send the pair of messages (current & previous) by enabling some internal caching into it.
Second approach is to use Spark Stateful Streaming to maintain the last message's values in Spark state context. As you are using PySpark, the only option which I know is to use the updateStateByKey
with check point enabled.
The typical flow will be as follows with PySpark Streaming,
maintain a common key to match the current and previous messages, I used pair_msgs
in this example.
# RDD with initial state (key, value) pairs
initialStateRDD = sc.parallelize([(u'pair_msgs', '{"url_id":"none"}')])
def updateFunc(new_url_msg, last_url_msg):
if not new_url_msg:
return last_url_msg
else:
new_url_dict = json.loads(new_url_msg[0])
new_url_dict['url_id_previous'] = json.loads(last_url_msg)['url_id']
return json.dumps(new_url_msg)
To map the input messages with the common key, pair_msgs
in this example.
invoke updateStateByKey
transformation with the above update function.
feeds = kafka_stream.map(lambda x: x[1])
pair_feed = feeds.map(lambda feed_str: ('pair_msgs', feed_str)) \
.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
[Note: As for as I know, PySpark Structured Streaming is yet to get Stateful Streaming support, So I believe the above example still makes sense]
Upvotes: 2