João
João

Reputation: 187

How to store each Spark Streaming iteration data to one RDD?

I am new to Spark. I am writing the following script that receives a stream from Kafka, which is then transformed to an RDD.

My goal is to store in-memory the data from each stream iteration to one RDD. Like adding an element to a list in each loop.

conf = SparkConf().setAppName("Application")
sc = SparkContext(conf=conf)

def joinRDDs(rdd):
     elements = rdd.collect()
     rdds = sc.parallelize(elements)
     transformed = rdds.map(lambda x: ('key', {u'name': x[1]}))

if __name__ == '__main__':
    ssc = StreamingContext(sc, 2)
    stream = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": host})
    stream.foreachRDD(joinRDDs)

How can I accomplish this?

Thank you for your attention

Upvotes: 0

Views: 427

Answers (1)

Bapi Akula
Bapi Akula

Reputation: 21

Use updateStatebyKey() and pass in function as needed.Function takes two arguements new data thats come in every batch and also historic data which you are holding in memory.

def countPurchasers(newValues,lastSum): if lastSum is None: lastSum=0 return sum(newValues,lastSum)

updateStatebBykey(countPurchasers)

Upvotes: 0

Related Questions