arkrad87
arkrad87

Reputation: 196

Code reuse between spark streaming and batch mode for distinct elements

I'm a Spark newbie and I want to implement a lambda architecture using spark streaming and spark batch.

Looking on the web I found the following article:

http://blog.cloudera.com/blog/2014/08/building-lambda-architecture-with-spark-streaming/

This is fine for some of my analysis but I don't think this solution is feasible in the case distinct elements have to be found.

If you want to find distinct elements on a JavaRDD you can use the method distinct. DStreams are sets of RDDs so if you apply

transform((rdd) -> rdd.distinct()) 

method on the Dstream you will execute the distinct on each rdd of the stream so you will find distinct elements in each RDD and not on the whole DStream.

May be written like that is a bit confusing so let me clarify with an example:

I have the following elements:

Apple
Pear
Banana
Peach
Apple
Pear

In the batch app:

JavaRDD<String> elemsRDD=sc.textFile(exFilePath).distinct() 

The child RDD will contain:

Apple
Pear
Banana
Peach

If I have understood correctly this should be the behaviour for the stream:

suppose we have a batch time of 1s and a window of 2s:

First RDD:

Apple  
Pear
Banana

Second RDD:

Peach
Apple
Pear

JavaDStream<String> elemsStream=(getting from whathever source)
childStream = elemsStream.transform((rdd) -> rdd.distinct())
childStream.forEachRDD...

will end up with 2 Rdds: First:

Apple  
Pear
Banana

Second:

Peach
Apple
Pear

That are distinct respect to the RDD but not respect to the DStream.

My solution for the Streaming part was the following:

JavaDStream<HashSet<String>> distinctElems = elemsStream.map(
                (elem) -> {
                    HashSet<String> htSet = new HashSet<String>();
                    htSet.add(elem);
                    return htSet;
                }).reduce((sp1, sp2) -> {
                    sp1.addAll(sp2);
                    return sp1;
                });

In this way the result is:

Apple
Pear
Banana
Peach

as the batch mode. However this solution will require maintenance overhead and have risk for errors resulting from duplicate code bases.

Is there a better way to reach the same outcome reusing as much as possible the code for the batch mode?

Thanks in advance.

Upvotes: 4

Views: 1174

Answers (1)

fhuertas
fhuertas

Reputation: 5473

Your solution is elegant.

I have other solution, It is less elegant than yours but I don't know if it is more efficient. This is my solution based on mapToPairFunction

JavaPairDStream<String, Integer> distinctElems = elemsStream
       .mapToPair(event -> new Tuple2<String, Integer>(event,1));
distinctElems = distinctElems.reduceByKey((t1, t2) -> t1);

I think that is more efficient but I cannot test it.

Upvotes: 1

Related Questions