Reputation: 1448
We have a step that splits up Pubsub messages on newline in Dataflow. We have a test that passes for the code, but it seems to fail in production. Looks like we get the same Pubsub message in multiple places in the pipeline at once (to the best of my knowledge at least).
Should we have written the first test in another way? Or is this just a hard lesson learned about what not to do in Apache Beam?
import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
import unittest
class SplitUpBatches(beam.DoFn):
def process(self, msg):
bodies = msg.data.split('\n')
for body in bodies:
msg.data = body.strip()
yield msg
class TestSplitting(unittest.TestCase):
body = """
first
second
third
""".strip()
def test_incorrectly_passing(self):
"""Incorrectly passing"""
msg = PubsubMessage(self.body, {})
with TestPipeline() as p:
assert_that(
p
| beam.Create([msg])
| "split up batches" >> beam.ParDo(SplitUpBatches())
| "map to data" >> beam.Map(lambda m: m.data),
equal_to(['first', 'second', 'third']))
def test_correctly_failing(self):
"""Failing, but not using a TestPipeline"""
msg = PubsubMessage(self.body, {})
messages = list(SplitUpBatches().process(msg))
bodies = [m.data for m in messages]
self.assertEqual(bodies, ['first', 'second', 'third'])
# => AssertionError: ['third', 'third', 'third'] != ['first', 'second', 'third']
Upvotes: 1
Views: 199
Reputation: 11021
TL;DR: Yes, this is an example of what not to do in Beam: Reutilize (mutate) your element objects.
In fact, Beam discourages mutating inputs and outputs of your transforms, because Beam passes/buffers those objects in various ways that can be affected if you mutate them.
The recommendation here is to create a new PubsubMessage
instance for each output.
This happens due to the ways in which Beam serializes and passes data around.
You may know that Beam executes several steps together in single workers - what we call stages. Your pipeline does something like this:
read_data -> split_up_batches -> serialize all data -> perform assert
This intermediate serialize data
step is an implementation detail. The reason is that for the Beam assert_that
we gather all of the data of a single PCollection into a single machine, and perform the assert (thus we need to serialize all elements and send them over to a single machine). We do this with a GroupByKey operation.
When the DirectRunner
receives the first yield
of a PubsubMessage('first')
, it serializes it and transfers it to a GroupByKey immediately - so you get the 'first', 'second', 'third'
result - because serialization happens immediately.
When the DataflowRunner
receives the first yield
of a PubsubMessage('first')
, it buffers it, and sends over a batch of elements. You get the 'third', 'third', 'third'
result, because serialization happens after a buffer is transmitted over - and your original PubsubMessage
instance has been overwritten.
Upvotes: 1