Reputation: 221
I am newbie on Stack overflow and to Spark.Basically doing RDD transformation.
My input data :
278222631,2763985,10.02.12,01.01.53,Whatsup,NA,Email,Halter,wagen,28.06.12,313657794,VW,er,i,B,0,23.11.11,234
298106482,2780663,22.02.12,22.02.12,Whatsup,NA,WWW,Halter,wagen,26.06.12,284788860,VW,er,i,B,0,02.06.04,123
My RDD format
val dateCov: RDD[(Long, Long, String, String, String, String, String, String, String, String, Long, String, String, String, String, String, String, Long)]
doing some reduceBykey
transformations map([(k,k),(v)] on col (1,17) as key and col(18) as Value. And applying some functions on reduceByKey
example:
val reducedSortedRDD = dateCov.map(r => { ((r._1, r._11) -> (r._18)) })
.reduceByKey((x, y) => ((math.min(x, y)))) // find minimum diff
.map(r => (r._1._1, r._1._2, r._2))
.sortBy(_._1, true)
reduceByKey
function to get all the others columns i.e my reducedSortedRDD return type should be reducedSortedRDD :
RDD[(Long, Long, String, String, String, String, String, String, String, String, Long, String, String, String, String, String, String, Long)]
and not reducedSortedRDD: RDD[(Long, Long, Long)]
as in this case.
reduceByKey
transformation I am using spark 1.4
Upvotes: 1
Views: 1387
Reputation: 13154
As far as I know, you need to bring all columns along in your reduceByKey
function (keep in mind the overhead of shuffling extra data) or alternatively you may be able to join the reducedSortedRDD
with your original data.
To bring all columns along, you would do something like this:
val reducedSortedRDD = dateCov
.map(r => ((r._1, r._11),(r._18, r._2, r._3, ..., r._17)))
.reduceByKey((value1,value2) => if (value1._1 < value2._1) value1 else value2)
.map{case(key, value) => (key._1, key._2, value._2, value._3, ..., value._17, value._1)}
.sortBy(_._1, true)
To do join, it would look something like this:
val keyValuedDateCov = dateCov
.map(r => ((r._1, r._11, r._18), (r._1, r._2,r._3, ...., r._17)))
val reducedRDD = dateCov
.map(r => ((r._1, r._11), r._18))
.reduceByKey((x, y) => math.min(x, y)) // find minimum diff
.map{case(key, value) => ((key._1, key._2, value), AnyRef)}
val reducedSortedRDD = reducedRDD
.join(keyValuedDateCov)
.map{case(key, (_, original)) => (key._1, key._2, original._1, original._2, original._3, ..., original._17, key._3)}
.sortBy(_._1, true)
The join version has a weakness in that if multiple rows in the original data has the exact same values in columns 1, 17 and 18, then the end result will also contain multiple rows with those values and thus not properly reduced. If data is guaranteed not to have multiple rows with the same values in these columns, there should be no problem.
Upvotes: 3