torblerone
torblerone

Reputation: 183

CombineFn for Python dict in Apache Beam pipeline

I've been experimenting with the Apache Beam SDK in Python to write data processing pipelines.

My data mocks IoT sensor data from a Google PubSub topic that streams JSON data like this:

{"id": 1, "temperature": 12.34}
{"id": 2, "temperature": 76.54}

There are IDs ranging from 0 to 99. Reading the JSON into a Python dict is no problem.

I created a custom CombineFn to process by CombinePerKey. I hoped that the output of my accumulator would be the calculations, grouped by the respective id fields from the dictionaries in the PCollection.

However, when the add_input method is called, it only receives the string temperature instead of the whole dictionary. I also did not find any reference to tell CombinePerKey which key (id field in my case) I want it to group data.

Maybe I also misunderstood the concept of CombinePerKey and CombineFn. I'd appreciate any help or hint on this. Maybe someone has an example for processing JSON batches with ID based grouping? Do I have to convert the dictionary into something else?

Upvotes: 0

Views: 1621

Answers (1)

CaptainNabla
CaptainNabla

Reputation: 1166

You need to either adjust your CombineFn or (what I would recommend) keep the CombineFn as generic as possible and map the input of the CombinePerKey accordingly. I have made a short examples of both cases below using this official beam example.

Specific CombineFn:

import apache_beam as beam

class SpecificAverageFn(beam.CombineFn):
  def create_accumulator(self):
    sum = 0.0
    count = 0
    accumulator = sum, count
    return accumulator

  def add_input(self, accumulator, input):
    sum, count = accumulator
    extracted_input = input['temperature'] # <- this is a dict, you need to create custom code here
    return sum + extracted_input, count + 1

  def merge_accumulators(self, accumulators):
    # accumulators = [(sum1, count1), (sum2, count2), (sum3, count3), ...]
    sums, counts = zip(*accumulators)
    # sums = [sum1, sum2, sum3, ...]
    # counts = [count1, count2, count3, ...]
    return sum(sums), sum(counts)

  def extract_output(self, accumulator):
    sum, count = accumulator
    if count == 0:
      return float('NaN')
    return sum / count

with beam.Pipeline() as pipeline:
  (
    pipeline
    | "mock input" >> beam.Create([
     {'id': 1, 'temperature': 2},
     {'id': 2, 'temperature': 3},
     {'id': 2, 'temperature': 2}
    ])
    | "add key" >> beam.Map(lambda x: (x['id'], x))
    | beam.CombinePerKey(SpecificAverageFn())
    | beam.Map(print)
  )

Generic Combinefn:

import apache_beam as beam

class GenericAverageFn(beam.CombineFn):
  # everything as SpecificAverageFn, except add_input:
  def add_input(self, accumulator, input):
    sum, count = accumulator
    return sum + input, count + 1


with beam.Pipeline() as pipeline:
  iot_data = (
    pipeline
    | "mock input" >> beam.Create([
     {'id': 1, 'temperature': 2},
     {'id': 2, 'temperature': 3},
     {'id': 2, 'temperature': 2}
    ])
    | "add key" >> beam.Map(lambda x: (x['id'], x))
  )

  # repeat below for other values
  (
    iot_data
    | "extract temp" >> beam.Map(lambda x: (x[0], x[1]['temperature']) 
    | beam.CombinePerKey(AverageFn())
    | beam.Map(print)
  )

Both approaches return

(1, 2.0)
(2, 2.5)

Upvotes: 1

Related Questions