Jas
Jas

Reputation: 15103

Scalding (older versions) counters based on cascading

In older versions of scalding there were still no counters introduced in its API. Hadoop Counters In Scalding suggests how to fallback to cascading counters in scalding

def addCounter(pipe : Pipe, group : String, counter : String) = {

  pipe.each(() -> ('addCounter)) ( fields =>
    new BaseOperation[Any](fields) with Function[Any] {

      def operate(flowProcess : FlowProcess[_], 
        functionCall : FunctionCall[Any]) {

          try {
            flowProcess.asInstanceOf[HadoopFlowProcess]
              .increment(group, counter, 1L)
            functionCall.getOutputCollector.add(new Tuple(new Array[Object](1) : _*))
          } catch {
            case cce: ClassCastException =>
            // HadoopFlowProcess is not available in local mode
          }
      }.discard('addCounter)
    }
  )
}

however when I tried that I get:

Error:(74, 14) ';' expected but '.' found.
}.discard('addCounter)
^

am i missing something? scalding version I use: 0.8.7

Upvotes: 0

Views: 202

Answers (1)

mohit6up
mohit6up

Reputation: 4348

.discard is a scalding command and should therefore be at the same level as .each, the other scalding command in the code block. Try putting it after the last closing parenthesis ")". (The second last line in the code you have posted.)

Here, operations are being chained to the RichPipe pipe, first the each, and then the discard:

pipe.each(...){predicate}.discard(...)

Upvotes: 2

Related Questions