Frans
Frans

Reputation: 1448

Test a step that yields the same instance multiple times

We have a step that splits up Pubsub messages on newline in Dataflow. We have a test that passes for the code, but it seems to fail in production. Looks like we get the same Pubsub message in multiple places in the pipeline at once (to the best of my knowledge at least).

Should we have written the first test in another way? Or is this just a hard lesson learned about what not to do in Apache Beam?

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
import unittest

class SplitUpBatches(beam.DoFn):
    def process(self, msg):
        bodies = msg.data.split('\n')
        for body in bodies:
            msg.data = body.strip()
            yield msg

class TestSplitting(unittest.TestCase):
    body = """
    first
    second
    third
    """.strip()

    def test_incorrectly_passing(self):
        """Incorrectly passing"""
        msg = PubsubMessage(self.body, {})
        with TestPipeline() as p:
            assert_that(
                p
                | beam.Create([msg])
                | "split up batches" >> beam.ParDo(SplitUpBatches())
                | "map to data" >> beam.Map(lambda m: m.data),
                equal_to(['first', 'second', 'third']))

    def test_correctly_failing(self):
        """Failing, but not using a TestPipeline"""
        msg = PubsubMessage(self.body, {})
        messages = list(SplitUpBatches().process(msg))
        bodies = [m.data for m in messages]
        self.assertEqual(bodies, ['first', 'second', 'third'])
        # => AssertionError: ['third', 'third', 'third'] != ['first', 'second', 'third']

Upvotes: 1

Views: 199

Answers (1)

Pablo
Pablo

Reputation: 11021

TL;DR: Yes, this is an example of what not to do in Beam: Reutilize (mutate) your element objects.

In fact, Beam discourages mutating inputs and outputs of your transforms, because Beam passes/buffers those objects in various ways that can be affected if you mutate them.

The recommendation here is to create a new PubsubMessage instance for each output.


Detailed explanation

This happens due to the ways in which Beam serializes and passes data around.

You may know that Beam executes several steps together in single workers - what we call stages. Your pipeline does something like this:

read_data -> split_up_batches -> serialize all data -> perform assert

This intermediate serialize data step is an implementation detail. The reason is that for the Beam assert_that we gather all of the data of a single PCollection into a single machine, and perform the assert (thus we need to serialize all elements and send them over to a single machine). We do this with a GroupByKey operation.

When the DirectRunner receives the first yield of a PubsubMessage('first'), it serializes it and transfers it to a GroupByKey immediately - so you get the 'first', 'second', 'third' result - because serialization happens immediately.

When the DataflowRunner receives the first yield of a PubsubMessage('first'), it buffers it, and sends over a batch of elements. You get the 'third', 'third', 'third' result, because serialization happens after a buffer is transmitted over - and your original PubsubMessage instance has been overwritten.

Upvotes: 1

Related Questions