shaikh
shaikh

Reputation: 592

Spark Streaming with Python: Joining two stream with respect to a particular attribute

I am receiving two socket streams S1 and S2 with schemas S1 and S2 respectively.

I would like to join S1 and S2 with respect to attribute "a" using spark streaming. Following is my code:

    sc = SparkContext("local[3]", "StreamJoin")
    ssc = StreamingContext(sc, 1) 

    S1 = ssc.socketTextStream("localhost", 9999)
    S2 = ssc.socketTextStream("localhost", 8085)

    # Create windowed stream
    wS1 = S1.window(10)
    wS2 = S2.window(1)

    wS1.flatMap(lambda line: line.split(",")).pprint()
    wS2.flatMap(lambda line: line.split(",")).pprint()

    # Perform join
    joinedStream = wS1.join(wS2)

    joinedStream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x)))

    ssc.start()             
    ssc.awaitTermination()

Both S1 and S2 are comma separated.

Although the above code performs join, however with respect to complete row.

I am interested to join the two streams with respect to a particular attribute, in this case attribute 'a'. How I can achieve this?

Thanks a lot!

Upvotes: 0

Views: 830

Answers (1)

user3689574
user3689574

Reputation: 1676

The way join works in spark is it joins rdd rows based on a key, the key is the value at row[0]. So you can do:

wS1.flatMap(lambda line: line.split(",")).map(lambda x: (x[0], x)).pprint()
wS2.flatMap(lambda line: line.split(",")).map(lambda x: (x[0], x)).pprint()

And then the join will be done based on the first element of the split list.

Docs reference:

https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=join#pyspark.RDD.join

Upvotes: 0

Related Questions