dendog
dendog

Reputation: 3328

Aggregating data in a window in apache beam

I am receiving a stream of a complex and nested JSON object as my input to my pipeline.

My goal is to create small batches to feed off to another pubsub topic for processing downstream. I am struggling with the beam.beam.GroupByKey() function - from what I have read this is the correct method to try and aggregate.

A simplified example, the input events:

{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a'], url: 'websiteB.com' }

I am trying to create the following:

{
'websiteA.com': {a:2, b:2, c:2},
'websiteB.com': {a:1},
}

My issue lies in trying to group on anything more that the simplest tuple throws a ValueError: too many values to unpack.

I could run this in two steps, but from my reading using beam.GroupByKey() is expensive and therefore should be minimised.

EDIT based on answer from @Cubez.

This is my combine function which seems to half work :(

class MyCustomCombiner(beam.CombineFn):
  def create_accumulator(self):
    logging.info('accum_created') #Logs OK!
    return {}

  def add_input(self, counts, input):
    counts = {}
    for i in input:
      counts[i] = 1
    logging.info(counts) #Logs OK!
    return counts

  def merge_accumulators(self, accumulators):
    logging.info('accumcalled') #never logs anything
    c = collections.Counter()
    for d in accumulators:
      c.update(d)
    logging.info('accum: %s', accumulators) #never logs anything
    return dict(c)

  def extract_output(self, counts):
    logging.info('Counts2: %s', counts) #never logs anything
    return counts

It seems past add_input nothing is being called?

Adding pipeline code:

with beam.Pipeline(argv=pipeline_args) as p:
    raw_loads_dict = (p 
      | 'ReadPubsubLoads' >> ReadFromPubSub(topic=PUBSUB_TOPIC_NAME).with_output_types(bytes)
      | 'JSONParse' >> beam.Map(lambda x: json.loads(x))
    )
    fixed_window_events = (raw_loads_dict
      | 'KeyOnUrl' >> beam.Map(lambda x: (x['client_id'], x['events']))
      | '1MinWindow' >> beam.WindowInto(window.FixedWindows(60))
      | 'CustomCombine' >> beam.CombinePerKey(MyCustomCombiner())
    )
    fixed_window_events | 'LogResults2' >> beam.ParDo(LogResults())

Upvotes: 4

Views: 3564

Answers (1)

Cubez
Cubez

Reputation: 918

This is a perfect example of needed to use combiners. These are transforms that are used to aggregate or combine collections across multiple workers. As the doc says, CombineFns work by reading in your element (beam.CombineFn.add_input), merging multiple elements (beam.CombineFn.merge_accumulators), then finally outputting the final combined value (beam.CombineFn.extract_output). See the Python docs for the parent class here.

For example, to create a combiner that outputs an average of a collection of numbers looks like this:

class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return (0.0, 0)

  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum + input, count + 1

  def merge_accumulators(self, accumulators):
    sums, counts = zip(*accumulators)
    return sum(sums), sum(counts)

  def extract_output(self, sum_count):
    (sum, count) = sum_count
    return sum / count if count else float('NaN')

pc = ...
average = pc | beam.CombineGlobally(AverageFn())

For your use case, I would suggest something like this:

values = [
          {'data':['a', 'b', 'c'], 'url': 'websiteA.com'},
          {'data':['a', 'b', 'c'], 'url': 'websiteA.com'},
          {'data':['a'], 'url': 'websiteB.com'}
]

# This counts the number of elements that are the same.
def combine(counts):
  # A counter is a dictionary from keys to the number of times it has
  # seen that particular key.
  c = collections.Counter()
  for d in counts:
    c.update(d)
  return dict(c)

with beam.Pipeline(options=pipeline_options) as p:
  pc = (p
        # You should replace this step with reading data from your
        # source and transforming it to the proper format for below.
        | 'create' >> beam.Create(values)

        # This step transforms the dictionary to a tuple. For this
        # example it returns:
        # [ ('url': 'websiteA.com', 'data':['a', 'b', 'c']),
        #   ('url': 'websiteA.com', 'data':['a', 'b', 'c']),
        #   ('url': 'websiteB.com', 'data':['a'])]
        | 'url as key' >> beam.Map(lambda x: (x['url'], x['data']))

        # This is the magic that combines all elements with the same
        # URL and outputs a count based on the keys in 'data'.
        # This returns the elements:
        # [ ('url': 'websiteA.com', {'a': 2, 'b': 2, 'c': 2}),
        #   ('url': 'websiteB.com', {'a': 1})]
        | 'combine' >> beam.CombinePerKey(combine))

  # Do something with pc
  new_pc = pc | ...

Upvotes: 8

Related Questions