awenclaw
awenclaw

Reputation: 543

Join DStream with dynamic dataset

I am new to Spark Streaming. I need to enrich events coming from stream, with data from dynamic dataset. I have problem with creating dynamic dataset. This dataset should be ingested by data coming from different stream (but this stream will be much lower throughput than the main stream of events). Additionally size of dataset will be approximately 1-3 GB so using simple HashMap will not be sufficient (in my opinion).

In Spark Streaming Programming Guide I have found:

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

and explanation: "In fact, you can also dynamically change the dataset you want to join against." This part I don't understand at all- how RDD can be dynamically changed? Isn't it immutable?

Below you can see my code. The point is to add every new RDD from the myStream to myDataset but apparently this doesn't work the way I would like this to work.

val ssc = new StreamingContext(conf, Seconds(5))
val myDataset: RDD[String] = ssc.sparkContext.emptyRDD[String]
val myStream = ssc.socketTextStream("localhost", 9997)
lines7.foreachRDD(rdd => {myDataset.union(rdd)})
myDataset.foreach(println)

I would appreciate any help or advice. Regards!

Upvotes: 2

Views: 1306

Answers (1)

Mark Rajcok
Mark Rajcok

Reputation: 364727

Yes, RDDs are immutable. One issue with your code is that union() returns a new RDD, it does not alter the existing myDataset RDD.

The Programming Guide says the following:

In fact, you can also dynamically change the dataset you want to join against. The function provided to transform is evaluated every batch interval and therefore will use the current dataset that dataset reference points to.

The first sentence might read better as follows:

In fact, you can also dynamically change which dataset you want to join against.

So we can change the RDD that dataset references, but not the RDD itself. Here's an example of how this could work (using Python):

# Run as follows:
# $ spark-submit ./match_ips_streaming_simple.py.py 2> err
# In another window run:
# $ nc -lk 9999
# Then enter IP addresses separated by spaces into the nc window
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

BATCH_INTERVAL = 2
SLEEP_INTERVAL = 8
sc       = SparkContext("local[*]", "IP-Matcher")
ssc      = StreamingContext(sc, BATCH_INTERVAL)
ips_rdd  = sc.parallelize(set())
lines_ds = ssc.socketTextStream("localhost", 9999)
# split each line into IPs
ips_ds   = lines_ds.flatMap(lambda line: line.split(" "))
pairs_ds = ips_ds.map(lambda ip: (ip, 1))
# join with the IPs RDD
matches_ds = pairs_ds.transform(lambda rdd: rdd.join(ips_rdd))
matches_ds.pprint()
ssc.start()

# alternate between two sets of IP addresses for the RDD
IP_FILES   = ('ip_file1.txt', 'ip_file2.txt')
file_index = 0
while True:
        with open(IP_FILES[file_index]) as f:
                ips = f.read().splitlines()
        ips_rdd = sc.parallelize(ips).map(lambda ip: (ip, 1))
        print "using", IP_FILES[file_index]
        file_index = (file_index + 1) % len(IP_FILES)
        time.sleep(SLEEP_INTERVAL)
#ssc.awaitTermination()

In the while loop, I change the RDD that ips_rdd references every 8 seconds. The join() transformation will use whatever RDD that ips_rdd currently references.

$ cat ip_file1.txt
1.2.3.4
10.20.30.40
$ cat ip_file2.txt
5.6.7.8
50.60.70.80

$ spark-submit ./match_ips_streaming_simple.py  2> err
using ip_file1.txt
-------------------------------------------
Time: 2015-09-09 17:18:20
-------------------------------------------

-------------------------------------------
Time: 2015-09-09 17:18:22
-------------------------------------------

-------------------------------------------
Time: 2015-09-09 17:18:24
-------------------------------------------
('1.2.3.4', (1, 1))
('10.20.30.40', (1, 1))

using ip_file2.txt
-------------------------------------------
Time: 2015-09-09 17:18:26
-------------------------------------------

-------------------------------------------
Time: 2015-09-09 17:18:28
-------------------------------------------
('50.60.70.80', (1, 1))
('5.6.7.8', (1, 1))
...

While the above job is running, in another window:

$ nc -lk 9999
1.2.3.4 50.60.70.80 10.20.30.40 5.6.7.8
<... wait for the other RDD to load ...>
1.2.3.4 50.60.70.80 10.20.30.40 5.6.7.8

Upvotes: 2

Related Questions