Miguel Ping
Miguel Ping

Reputation: 18347

Scalding: retaining all fields after groupBy

I'm doing a groupBy for calculating a value, but it seems that when I group by, I lose all the fields that are not in the aggregation keys:

filtered.filterNot('site) {s:String => ...}
        .filterNot('date) {s:String => ...}
aggr = filtered.groupBy('id, 'contentHost) { group =>
    group.min('timestamp -> 'min)
    //how do I keep original fields? (eg: site, date)
}

aggr.store(Tsv(...)) //eg: field "site" won't be here

in pig, it would be something like this:

aggr = group filtered by concat('id, 'contentHost);

result = foreach aggr {
  generate flatten(filtered), //how to do this in scalding?
           min(filtered.timestamp) as min;
}

Upvotes: 3

Views: 1276

Answers (1)

Marius Soutier
Marius Soutier

Reputation: 11274

I had the same problem with the tuple API and could only solve it by using the typed API.

You can either use Scala tuples or define your own case class outside your job. E.g.:

case class Data(id: String, site: String, date: String, contentHost: String)

Then you'd process it like this:

val filtered: TypedPipe[Data] = TypedPipe.from(Seq(Data("...", "2014-04-14", "...", "...")))

filtered
  .filterNot ( data => data.site == "fr" )
  .filterNot ( data => data.date == "2014-02-01" )
  .groupBy (data => (data.id, data.contentHost)) // (String,String) -> Data
  .min // or .minBy { ... }
  .toTypedPipe
  .write(TypedTsv[((String, String), Data)]("/path/"))

Upvotes: 4

Related Questions