tombrawz
tombrawz

Reputation: 137

How to count elements inside a time window in Apache beam, and emit the data when the count reach some threshold?

I want to count elements per key from unbounded source (I am using Pub/Sub for my source) within a time window, and emit the count result when it reach some threshold. For example, I want to count the elements within 10 minutes fixed time window, and emit the results to another Pub/Sub when the count is > 5.

    transformation = (p
                    | beam.io.ReadFromPubSub(subscription=known_args.input_subscription))
                    | 'parse' >> beam.Map(json.loads)
                    | beam.WindowInto(window.FixedWindows(600))
                    | 'count' >> beam.combiners.Count.PerKey()
                    | 'filter' >> beam.Filter(lambda data: data['count'] > 5))
    transformation | beam.io.WriteToPubSub(known_args.output_topic)

However, the results that written to the Pub/Sub seems to be delayed, and from my estimation, the result emitted after the window time expired. What are the additional window option/code do I need to emit the result immediately?

Upvotes: 1

Views: 1206

Answers (1)

guillaume blaquiere
guillaume blaquiere

Reputation: 75790

You can see in the documentation that you can add trigger on window.

Go to the section 9.3 and you have the definition of the AfterCount trigger that emit the data every each "count completed".

I never wrote this in Python (only in Java), but the code should be like this

    transformation = (p
                    | beam.io.ReadFromPubSub(subscription=known_args.input_subscription))
                    | 'parse' >> beam.Map(json.loads)
                    | beam.WindowInto(window.FixedWindows(600),
                        trigger=AfterCount(5))
                    | 'count' >> beam.combiners.Count.PerKey()
                    | 'filter' >> beam.Filter(lambda data: data['count'] > 5))
    transformation | beam.io.WriteToPubSub(known_args.output_topic)

To be sure to process all the elements (if you have less than 5 element at the end of the window) you need to build a composite trigger, like this

trigger=Repeatedly(
        AfterAny(AfterCount(5), AfterProcessingTime(10 * 60))),

Upvotes: 4

Related Questions