Reputation: 183
I've been experimenting with the Apache Beam SDK in Python to write data processing pipelines.
My data mocks IoT sensor data from a Google PubSub topic that streams JSON data like this:
{"id": 1, "temperature": 12.34}
{"id": 2, "temperature": 76.54}
There are IDs ranging from 0
to 99
. Reading the JSON into a Python dict is no problem.
I created a custom CombineFn
to process by CombinePerKey
. I hoped that the output of my accumulator would be the calculations, grouped by the respective id
fields from the dictionaries in the PCollection.
However, when the add_input
method is called, it only receives the string temperature
instead of the whole dictionary. I also did not find any reference to tell CombinePerKey
which key (id
field in my case) I want it to group data.
Maybe I also misunderstood the concept of CombinePerKey and CombineFn. I'd appreciate any help or hint on this. Maybe someone has an example for processing JSON batches with ID based grouping? Do I have to convert the dictionary into something else?
Upvotes: 0
Views: 1621
Reputation: 1166
You need to either adjust your CombineFn
or (what I would recommend) keep the CombineFn
as generic as possible and map the input of the CombinePerKey
accordingly. I have made a short examples of both cases below using this official beam example.
Specific CombineFn
:
import apache_beam as beam
class SpecificAverageFn(beam.CombineFn):
def create_accumulator(self):
sum = 0.0
count = 0
accumulator = sum, count
return accumulator
def add_input(self, accumulator, input):
sum, count = accumulator
extracted_input = input['temperature'] # <- this is a dict, you need to create custom code here
return sum + extracted_input, count + 1
def merge_accumulators(self, accumulators):
# accumulators = [(sum1, count1), (sum2, count2), (sum3, count3), ...]
sums, counts = zip(*accumulators)
# sums = [sum1, sum2, sum3, ...]
# counts = [count1, count2, count3, ...]
return sum(sums), sum(counts)
def extract_output(self, accumulator):
sum, count = accumulator
if count == 0:
return float('NaN')
return sum / count
with beam.Pipeline() as pipeline:
(
pipeline
| "mock input" >> beam.Create([
{'id': 1, 'temperature': 2},
{'id': 2, 'temperature': 3},
{'id': 2, 'temperature': 2}
])
| "add key" >> beam.Map(lambda x: (x['id'], x))
| beam.CombinePerKey(SpecificAverageFn())
| beam.Map(print)
)
Generic Combinefn
:
import apache_beam as beam
class GenericAverageFn(beam.CombineFn):
# everything as SpecificAverageFn, except add_input:
def add_input(self, accumulator, input):
sum, count = accumulator
return sum + input, count + 1
with beam.Pipeline() as pipeline:
iot_data = (
pipeline
| "mock input" >> beam.Create([
{'id': 1, 'temperature': 2},
{'id': 2, 'temperature': 3},
{'id': 2, 'temperature': 2}
])
| "add key" >> beam.Map(lambda x: (x['id'], x))
)
# repeat below for other values
(
iot_data
| "extract temp" >> beam.Map(lambda x: (x[0], x[1]['temperature'])
| beam.CombinePerKey(AverageFn())
| beam.Map(print)
)
Both approaches return
(1, 2.0)
(2, 2.5)
Upvotes: 1