Reputation: 18347
I'm doing a groupBy
for calculating a value, but it seems that when I group by, I lose all the fields that are not in the aggregation keys:
filtered.filterNot('site) {s:String => ...}
.filterNot('date) {s:String => ...}
aggr = filtered.groupBy('id, 'contentHost) { group =>
group.min('timestamp -> 'min)
//how do I keep original fields? (eg: site, date)
}
aggr.store(Tsv(...)) //eg: field "site" won't be here
in pig, it would be something like this:
aggr = group filtered by concat('id, 'contentHost);
result = foreach aggr {
generate flatten(filtered), //how to do this in scalding?
min(filtered.timestamp) as min;
}
Upvotes: 3
Views: 1276
Reputation: 11274
I had the same problem with the tuple API and could only solve it by using the typed API.
You can either use Scala tuples or define your own case class outside your job. E.g.:
case class Data(id: String, site: String, date: String, contentHost: String)
Then you'd process it like this:
val filtered: TypedPipe[Data] = TypedPipe.from(Seq(Data("...", "2014-04-14", "...", "...")))
filtered
.filterNot ( data => data.site == "fr" )
.filterNot ( data => data.date == "2014-02-01" )
.groupBy (data => (data.id, data.contentHost)) // (String,String) -> Data
.min // or .minBy { ... }
.toTypedPipe
.write(TypedTsv[((String, String), Data)]("/path/"))
Upvotes: 4