Reputation: 1455
I am getting the Hbase data and trying to do a spark job on it. My table has around 70k rows and each row has a column 'type', which can have the values:post,comment or reply. Based on the type, I want to take out different Pair RDDs like shown below(for post).
JavaPairRDD<ImmutableBytesWritable, FlumePost> postPairRDD = hBaseRDD.mapToPair(
new PairFunction<Tuple2<ImmutableBytesWritable, Result>, ImmutableBytesWritable, FlumePost>() {
private static final long serialVersionUID = 1L;
public Tuple2<ImmutableBytesWritable, FlumePost> call(Tuple2<ImmutableBytesWritable, Result> arg0)
throws Exception {
FlumePost flumePost = new FlumePost();
ImmutableBytesWritable key = arg0._1;
Result result = arg0._2;
String type = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("t")));
if (type.equals("post")) {
return new Tuple2<ImmutableBytesWritable, FlumePost>(key, flumePost);
} else {
return null;
}
}
}).distinct();
Problem here is, For all the rows with type other than post I have to send null value which is undesirable. And iteration goes on for 70k times for all the three types, wasting the cycles. So my first question is:
1) What is the efficient way to do this?
So now after getting 70k results I put the distinct()
method to remove the duplication of null values. So I end up having one null value object in it. I expect 20327 results but I get 20328.
2) Is there a way to remove this null entry from the pair RDD?
Upvotes: 0
Views: 2383
Reputation: 37852
You can use the filter
operation on the RDD.
Simply call:
.filter(new Function<Tuple2<ImmutableBytesWritable, FlumePost>, Boolean>() {
@Override
public Boolean call(Tuple2<ImmutableBytesWritable, FlumePost> v1) throws Exception {
return v1 != null;
}
})
before calling distinct()
to filter out the null
s.
Upvotes: 2