0_0
0_0

Reputation: 574

How does Apache Beam's CombineValues operate over elements when executing arithmetic operations

This is a bit of a contrived example, but I have been exploring the docs for CombineValues and wish understand what I'm seeing.

If I combine values and perform some arithmetic operations on the values (the goal is to calculate percentages of keys present in a bounded stream), then I need to use the AverageFn (as defined in Example 8 in docs and provided in the source code example snippets).

However, this (based on Example 5) does not work:

with beam.Pipeline() as pipeline:

  counts = ( pipeline
             | 'create' >>  beam.Create(['xxx'])
             | 'key it' >> beam.Map(lambda elem: (elem, 1))
             | 'combine' >> beam.CombinePerKey(
                 lambda values: sum(values)/2
                 )
             | 'print' >> beam.Map(print)
           )

as it produces

('xxx', 0.25)

I ultimately wanted to compute the count via

  totals = pipeline | 'Count elements' >> beam.combiners.Count.Globally()

and then use the singleton approach they suggest (where I provide beam.pvalue.AsSingleton(totals) to beam.CombineValues).

My question is, why does CombineValues appear to execute twice (probably going to be some facepalming)?

Upvotes: 0

Views: 444

Answers (1)

Iñigo
Iñigo

Reputation: 2680

The reason the combiner is being called twice is because of the MapReduce phases. Since the function you are using (halving the mean) is not associative, you'd need a an "advance combiner" as in the example 8 you mention.

What is happening in your current code is, from (xxx, 1) calculate the half mean (xxx, 0.5) and then, when merging the values, it halves it again, making (xxx, 0.25).

In this answer I explain a similar concept.

For your particular case, as mentioned, you need "advance combiners"

 with beam.Pipeline() as pipeline:

    def combiner(elements):
        print(elements)
        return sum(elements)/2

    class HalfMean(beam.CombineFn):
        def create_accumulator(self):
            # Tuple of sum, count
            return (0, 0)

        def add_input(self, accumulator, input):
            # Add the current element to sum, add one to count
            new_tuple = (accumulator[0] + input, accumulator[1] + 1)
            return new_tuple

        def merge_accumulators(self, accumulators):
            # Join all accumulators
            partial_sums = [x[0] for x in accumulators]
            partial_counts = [x[1] for x in accumulators]
            merged_tuple = (sum(partial_sums), sum(partial_counts))
            return merged_tuple

        def extract_output(self, sum_count_tuple):
            # Now we can output half of the mean
            mean = sum_count_tuple[0]/sum_count_tuple[1]
            return mean/2

    counts = ( pipeline
             | 'create' >> beam.Create(['xxx'])
             | 'key it' >> beam.Map(lambda elem: (elem, 1))
             #| 'combine' >> beam.CombinePerKey(combiner)
             | 'advance combine' >> beam.CombinePerKey(HalfMean())
             | 'print' >> beam.Map(print)
           )

I'm leaving your old combiner with a print so you see what's happening.

Anyhow, that is still not a CombineValues but a CombinerPerKey. CombineValues takes a key value pair on which the value is an iterator, and applies the combiner to it. In the following case, the elements that it's taking are ('a', [1, 2, 3]) and ('b', [10]). Here you have the example

    kvs = [('a', 1),
           ('a', 2),
           ('a', 3),
           ('b', 10),
   ]

    combine_values = (pipeline
             | 'create_kvs' >> beam.Create(kvs)
             | 'gbk' >> beam.GroupByKey()
             | 'combine values' >> beam.CombineValues(HalfMean())
             | 'print cv' >> beam.Map(print)
           )

Upvotes: 2

Related Questions