Conall Daly
Conall Daly

Reputation: 63

How do I run Apache Beam Integration tests?

I am trying to run the game stats example pipeline and integration tests found here but I'm not sure what is the correct way to set up my local environment.

My main goal is to learn how to use the TestDataflowRunner so that I can implement integration tests for existing pipelines that I have written.

[UPDATE] I have written a basic dataflow which reads a message from PubSub and writes it to a different topic. I have an integration test that is passing using the TestDirectRunner but I am getting errors when trying to use the TestDataflowRunner

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions

def run(argv=None):
  """Build and run the pipeline."""
  parser = argparse.ArgumentParser()  
  parser.add_argument('--output_topic', required=True)
  parser.add_argument('--input_subscription', required=True)

  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(StandardOptions).streaming = True
  with beam.Pipeline(options=pipeline_options) as p:
    # Read from PubSub into a PCollection.
    messages = (
        p |

    lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))

    def format_pubsub(msg):'Format PubSub: {msg}')
        return str(msg)

    output = (
        | 'format' >> beam.Map(format_pubsub)
        | 'encode' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes))

    output |

if __name__ == '__main__':

from __future__ import absolute_import

import logging
import os
import time
import unittest
import uuid

from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr

from import utils
from import PubSubMessageMatcher
from apache_beam.runners.runner import PipelineState
from apache_beam.testing import test_utils
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline

import pipeline

INPUT_TOPIC = 'wordcount-input'
OUTPUT_TOPIC = 'wordcount-output'
INPUT_SUB = 'wordcount-input-sub'
OUTPUT_SUB = 'wordcount-output-sub'

WAIT_UNTIL_FINISH_DURATION = 12 * 60 * 1000  # in milliseconds

class TestIT(unittest.TestCase):
    def setUp(self):
        self.test_pipeline = TestPipeline(is_integration_test=True)
        self.project = self.test_pipeline.get_option('project')
        self.uuid = str(uuid.uuid4())

        # Set up PubSub environment.
        from import pubsub
        self.pub_client = pubsub.PublisherClient()
        self.input_topic = self.pub_client.create_topic(
            self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
        self.output_topic = self.pub_client.create_topic(
            self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))

        self.sub_client = pubsub.SubscriberClient()
        self.input_sub = self.sub_client.create_subscription(
            self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
        self.output_sub = self.sub_client.create_subscription(
            self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
    def _inject_numbers(self, topic, num_messages):
        """Inject numbers as test data to PubSub."""
        logging.debug('Injecting %d numbers to topic %s', num_messages,
        for n in range(num_messages):
            self.pub_client.publish(, str(n).encode('utf-8'))

    def tearDown(self):
        test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub, self.output_sub])
        test_utils.cleanup_topics(self.pub_client, [self.input_topic, self.output_topic])
    def test_pubsub_pipe_it(self):
        # Build expected dataset.
        expected_msg = [('%d' % num).encode('utf-8') for num in range(DEFAULT_INPUT_NUMBERS)]

        # Set extra options to the pipeline for test purpose
        state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
        pubsub_msg_verifier = PubSubMessageMatcher(self.project,, expected_msg, timeout=400)
        extra_opts = {
            'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
            'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)

        # Generate input data and inject to PubSub.
        self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)

        # Get pipeline options from command argument: --test-pipeline-options,
        # and start pipeline job by calling pipeline main function.**extra_opts))

if __name__ == '__main__':

I am getting this error in the dataflow logs

Error message from worker: generic::unknown: Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/", line 290, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.6/site-packages/dill/", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.6/site-packages/dill/", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.6/site-packages/dill/", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.6/site-packages/dill/", line 826, in _import_module
    return __import__(import_name)
ModuleNotFoundError: No module named 'pipeline'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 289, in _execute
    response = task()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 362, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 607, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 638, in process_bundle
    instruction_id, request.process_bundle_descriptor_id)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 467, in get
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 868, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 925, in create_execution_tree
    descriptor.transforms, key=topological_height, reverse=True)])
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 924, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 906, in get_operation
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 906, in <dictcomp>
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 904, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 906, in get_operation
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 906, in <dictcomp>
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 904, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 906, in get_operation
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 906, in <dictcomp>
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 904, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 812, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 909, in get_operation
    transform_id, transform_consumers)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 1198, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 1546, in create_par_do
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/", line 1582, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/", line 294, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.6/site-packages/dill/", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.6/site-packages/dill/", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.6/site-packages/dill/", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.6/site-packages/dill/", line 826, in _import_module
    return __import__(import_name)
ModuleNotFoundError: No module named 'pipeline'

passed through:

The command I'm running is

pytest --log-cli-level=INFO --test-pipeline-options="--runner=TestDataflowRunner \
    --project=$PROJECT --region=europe-west1 \
    --staging_location=gs://$BUCKET/staging \
    --temp_location=gs://$BUCKET/temp \
    --job_name=it-test-pipeline \
    --setup_file ./"

My repo for this pipeline can be found here


Upvotes: 3

Views: 1251

Answers (1)

Udi Meiri
Udi Meiri

Reputation: 1103

The integration tests are designed to be run by Beam's CI/CD infrastructure. They are nose based and require a custom plugin to understand the --test-pipeline-options flag. I wouldn't recommend going this route.

I would follow the quick start guide that Ricco D suggested for the environment. You could use pytest to run the integration test. To use the same --test-pipeline-options flag, you'll need this definition. Otherwise the wordcount example shows how to set up your own command line flags.


I used this to set up the virtualenv:

pip install apache-beam[gcp,test]

The test tag pulls in pytest, but should not be required if you already have pytest installed.

I then created this file to configure pytest (based on Beam's own

def pytest_addoption(parser):
                   help='Options to use in test pipelines. NOTE: Tests may '
                        'ignore some or all of these options.')

To run the test:

pytest --log-cli-level=INFO --test-pipeline-options="--runner=TestDataflowRunner --project=PROJECT --region=us-west1 --staging_location=gs://BUCKET/staging --temp_location=gs://BUCKET/temp --output=gs://BUCKET/output "

Not all options in --test-pipeline-options may be required for your test.

Upvotes: 3

Related Questions