Reputation: 964
I have an apache beam pipeline that works fine in both local and cloud modes. However, I have an end to end integration tests that I'm running in every MR, and the IT is submitted to Dataflow.
This time, the IT is throwing the following error:
_import_module return __import__(import_name) ModuleNotFoundError: No module named 'main'
The stacktrace is not pointing at all to the place where the module is not recognised. Just the follwing:
job-v2-test-20-08160911-vs73-harness-drt8
Root cause: Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/apache_beam/internal/dill_pickler.py", line 285, in loads
return dill.loads(s)
File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 826, in _import_module
return __import__(import_name)
ModuleNotFoundError: No module named 'main'
The main module I use only in the IT file, and it doesn't exist in any transformation of the pipeline. Also, when I run the IT, half of the pipeline transformation runs successfully until it hangs with the provided error
The IT code:
from main import run
import argparse
import unittest
import logging
class PipelineIT(unittest.TestCase):
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
parser = argparse.ArgumentParser()
self.args, self.beam_args = parser.parse_known_args()
self.pipeline_options = PipelineOptions(self.beam_args)
self.client = get_bq_instance()
self.tables_timestamp = datetime.now().strftime("%Y%m%d%H%M")
def test_mc_end_to_end(self):
state_verifier = PipelineStateMatcher(PipelineState.DONE)
extra_opts = {
'input': IT_BUCKET,
'output_dataset': IT_DATASET,
'output': IT_OUTPUT,
'bq_timestamp': self.tables_timestamp,
'on_success_matcher':
all_of(state_verifier)
}
run(self.test_pipeline.get_full_options_as_args(**extra_opts), save_main_session=True)
# buch of asserts
THe command I'm using to run the IT
coverage run -m pytest --log-cli-level=INFO integration_tests/end_to_end_it_test.py --job_name "end_to_end_it" --test-pipeline-options=" --run_mode=cloud --mode=test --setup_file=path_to_setup.py"
The pipeline works fine in the production mode, but in the testing mode it shows that error.
I'm just wondering if the main
is used only to trigger the integration test from local, how can it breaks the pipeline with the error
Upvotes: 0
Views: 648
Reputation: 964
After deep investigation, in my pipeline, I was using beam.Filter
in the following way:
dropped_and_missing = all_recs | 'Filter Dropped and Missing recs' >> beam.Filter(lambda rec: rec['existing_status'] == 'Dropped' or rec['existing_status'] == 'Missing')
Replacing the code block with a PTransformation
that is based on if
conditions solved the issue.
I don't know where the issue is, I tried to dig into the source code, checking if there is in main module in the Filter
function, but it doesn't exist.
Also what's suspicious is the error is occurred only when running the integration test from the command line. Pipeline works fine with LocalRunner
and DataflowRunner
Upvotes: 1