Lemon
Lemon

Reputation: 53

Combiner Functions Seemingly Not emitting correct results

So I'm working on a test streaming case. Reading from pubsub and for now, sending to stdout for some visuals on the pipeline and transforms.

I believe I'm getting some unusual output, and believe I'm likely missing something so hoping someone can help.

Take my code (stripped back to debug):

with beam.Pipeline(options=opts)as p:
    (
    p
        | ReadFromPubSub(topic=topic_name
                        ,timestamp_attribute='timestamp')
        | beam.WindowInto(beam.window.FixedWindows(beam.window.Duration(5)),
                         trigger=beam.trigger.AfterWatermark(),
                         accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING)
        | beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults()
        | beam.Map(print)
    )

I am generating an arbitrary number of events and pushing those to my topic - currently 40. I can confirm through the generation of the events that they all succeed in reaching the topic. Upon simply printing the results of the topic (using beam), I can see what I would expect.

However, what I wanted to try was some basic window aggregation and using both beam.CombineGlobally(beam.combiners.CountCombineFn()) and beam.combiners.Count.Globally(), I notice 2 things happening (not strictly at the same time).

The first issue:

class ShowWindowing(beam.DoFn):
    def process(self, elem, window = beam.DoFn.WindowParam):
        yield f'I am an element: {elem}\nstart window time:{window.start.to_utc_datetime()} and the end window time: {window.end.to_utc_datetime()}'

The second issue I have (which I feel is related to the above but I've seen this occur without the semi-grouping of elements):

I've read and re-read the docs (admittedly skipping over some of it just to seek an answer) but I've referenced the katas and the Google Dataflow quest to look for examples/ alternatives and I cannot identify where I'm going wrong.

Thanks

Upvotes: 0

Views: 119

Answers (1)

robertwb
robertwb

Reputation: 5104

I think this boils down to a TODO in the Python local runner in handling watermarks for PubSub subscriptions. Essentially, it thinks it has received all the data up until now, but there is still data in PubSub that has a timestamp less than now() which becomes late data once it is actually read.

A real runner such as Dataflow won't have this issue.

Upvotes: 1

Related Questions