Reputation: 15103
In older versions of scalding
there were still no counters
introduced in its API. Hadoop Counters In Scalding suggests how to fallback to cascading counters in scalding
def addCounter(pipe : Pipe, group : String, counter : String) = {
pipe.each(() -> ('addCounter)) ( fields =>
new BaseOperation[Any](fields) with Function[Any] {
def operate(flowProcess : FlowProcess[_],
functionCall : FunctionCall[Any]) {
try {
flowProcess.asInstanceOf[HadoopFlowProcess]
.increment(group, counter, 1L)
functionCall.getOutputCollector.add(new Tuple(new Array[Object](1) : _*))
} catch {
case cce: ClassCastException =>
// HadoopFlowProcess is not available in local mode
}
}.discard('addCounter)
}
)
}
however when I tried that I get:
Error:(74, 14) ';' expected but '.' found.
}.discard('addCounter)
^
am i missing something? scalding version I use: 0.8.7
Upvotes: 0
Views: 202
Reputation: 4348
.discard
is a scalding command and should therefore be at the same level as .each
, the other scalding command in the code block. Try putting it after the last closing parenthesis ")". (The second last line in the code you have posted.)
Here, operations are being chained to the RichPipe pipe, first the each
, and then the discard
:
pipe.each(...){predicate}.discard(...)
Upvotes: 2