Reputation: 31
I am attempting to understand the lifecycle of a DoFn in more detail. I've added this counter to the init of my DoFn:
> self.counter = Metrics.counter(self.__class__, 'counts')
And correspondingly increment within the corresponding process()
> self.counter.inc()
For simplicity sake, the goal is to count the number of elements read in total over the whole stream, i.e. persist the metric across the DoFn calls. I am seeing that when advancing the watermark / processing time, the counter value is reset. Log for reference:
> pardo start
> bundle started
> incremented counter.
> bundle started
> incremented counter.
> counter value: 1
I would expect this counter to be 2. I am able to see the counter properly increment if I send in a stream of several elements with no artificial gap in time, incrementing as expected. Why is it resetting on watermark advancement? Aren't metrics supposed to be used for aggregate statistics?
The stream is sent via a TestStream with one set of elements sent at 0, advancing watermark and sending another set.
Additionally, note that the metrics ARE properly aggregated across stream inputs when testing this using DataflowRunner, so it seems to be related only to DirectRunner Thank you, and please let me know if I can clarify anywhere.
Upvotes: 1
Views: 59
Reputation: 41
This sounds like it is likely a bug with the local runner - in general, the python local runner is unfortunately not as reliable as some other runners and is primarily meant to serve as a test runner. You could consider trying the Prism runner - https://beam.apache.org/documentation/runners/prism/ - which is a new portable local runner which should be more reliable and is receiving greater investment from the community.
Either way, if you run into issues I'd recommend opening an issue in the Beam repo - https://github.com/apache/beam/issues/new/choose
Upvotes: 0