Reputation: 6617
I am using google mapreduce lib to process my data. While processing the data, counter be used in the mapper function. But I don't know how to get the counter results in finalized method.
def mapper(obj):
yield obj
yield operation.counters.Increment("process-obj")
class Test(base_handler.PipelineBase):
"""A pipeline to ingest log as CSV in Google Storage
"""
def run(self, setting_id):
filepath = yield mapreduce_pipeline.MapperPipeline(
"test",
"mapper",
"mapreduce.input_readers.DatastoreInputReader",
output_writer_spec="mapreduce.output_writers.FileOutputWriter",
params={
},
shards=10
)
def finalized(self):
# how to read the counter process-obj
# how to get the setting_id
pass
Upvotes: 1
Views: 568
Reputation: 126
Named outputs is probably what you are looking for. You can find more details here.
Here is your code using named outputs to get back the various counters including the one you defined:
def mapper(obj):
yield obj
yield operation.counters.Increment("process-obj")
class Test(base_handler.PipelineBase):
"""A pipeline to ingest log as CSV in Google Storage
"""
output_names = ['counters']
def run(self, setting_id):
results = yield mapreduce_pipeline.MapperPipeline(
"test",
"mapper",
"mapreduce.input_readers.DatastoreInputReader",
output_writer_spec="mapreduce.output_writers.FileOutputWriter",
params={
},
shards=10
)
yield MapreduceResult(results.counters)
def finalized(self):
print 'Counters here: ', self.outputs.counters
class MapreduceResult(base_handler.PipelineBase):
def run(self, counters):
self.fill(self.outputs.counters, counters)
Upvotes: 2