Reputation: 48386
The beam.DoFn
is defined as below, following this example with Metrics.counter
from apache_beam.metrics import Metrics
class ParseAndFilterFn(beam.DoFn):
def __init__(self):
super(ParseAndFilterFn, self).__init__()
self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')
def process(self, element):
text_line = element.strip()
data = {}
try:
data = json.loads(text_line.decode('utf-8'))
yield data['id']
except Exception as ex:
print("Parse json exception of ParseAndFilterFn:", ex)
self.num_parse_errors.inc()
When there is one error of json.loads
, this error comes up AttributeError: 'ParseAndFilterFn' object has no attribute 'num_parse_errors' [while running 'ParseAndFilterFn']
What is wrong with my code or anything I am missing?
Beam version: 2.14.0
Upvotes: 1
Views: 373