Dazzler
Dazzler

Reputation: 847

How to figure out if DStream is empty

I have 2 inputs, where first input is stream (say input1) and the second one is batch (say input2). I want to figure out if the keys in first input matches single row or more than one row in the second input. The further transformations/logic depends on the number of rows matching, whether single row matches or multiple rows match (for atleast one key in the first input)

if(single row matches){
     // do something
}else{
     // do something
}

Code that i tried so far

val input1Pair = streamData.map(x => (x._1, x))
val input2Pair = input2.map(x => (x._1, x))
val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)}
val result = joinData.mapValues{
    case(v, Some(a)) => 1L
    case(v, None) => 0
 }.reduceByKey(_ + _).filter(_._2 > 1)

I have done the above coding. When I do result.print, it prints nothing if all the keys matches only one row in the input2. With the fact that the DStream may have multiple RDDs, not sure how to figure out if the DStream is empty or not. If this is possible then I can do a if check.

Upvotes: 2

Views: 4177

Answers (1)

maasg
maasg

Reputation: 37435

There's no function to determine if a DStream is empty, as a DStream represents a collection over time. From a conceptual perspective, an empty DStream would be a stream that never has data and that would not be very useful.

What can be done is to check whether a given microbatch has data or not:

dstream.foreachRDD{ rdd => if (rdd.isEmpty) {...} }

Please note that at any given point in time, there's only one RDD.

I think that the actual question is how to check the number of matches between the reference RDD and the data in the DStream. Probably the easiest way would be to intersect both collections and check the intersection size:

val intersectionDStream = streamData.transform{rdd => rdd.intersection(input2)}
intersectionDStream.foreachRDD{rdd => 
    if (rdd.count > 1) {
       ..do stuff with the matches
    } else {
       ..do otherwise
    }
}

We could also place the RDD-centric transformations within the foreachRDD operation:

streamData.foreachRDD{rdd => 
    val matches = rdd.intersection(input2)
    if (matches.count > 1) {
       ..do stuff with the matches
    } else {
       ..do otherwise
    }
}

Upvotes: 4

Related Questions