Reputation: 602
I am using a dataflow pipeline and I'd like to be able to access files on it.
this is the line where I call method to get files and extract modules:
def run():
""" Run pipeline"""
options: PipelineOptions = PipelineOptions(
project='production-213911',
runner='DataflowRunner',
region='europe-west1',
streaming=True,
setup_file='dataflow/setup.py',
autoscaling_algorith='THROUGHPUT_BASED',
)
proto = Container().protobuf()
test = proto.get_proto_file('data/build')
proto.get_obj_from_file(test)
with beam.Pipeline(options=options) as pipeline:
...
this is the pipeline step where I'd like to use my module's list:
status_records = (status | 'Proto to Dict' >> beam.Map(
lambda x: convert_proto_to_dict(x, proto.protos)))
and here is the code that browse the directory to get files:
@staticmethod
def get_proto_file(dirname: str = 'python_protocol') -> List[object]:
"""Iterate threw dir w. build .proto
:param
dirname = name of the directory to browse
:return
List[object] with module
"""
protos: List[object] = []
for root, _, mod in os.walk(dirname):
for name in mod:
if 'pb2' in name and 'pyc' not in name:
print(name)
module = name[:-3]
if '/' in root:
dirname = root.replace('/', '.')
path = f'{dirname}.{module}'
imported_module = importlib.import_module(path)
protos.append(imported_module)
return protos
However my 'proto.protos' variable is always set to None (meaning test is None, I am sure the problem is on the first step)
I tried to call this line from a file on the same level as my pipeline and it works:
test = proto.get_proto_file('data/build')
So I am guessing this is because my files are not in dataflow as they are in my project.. Any idea how to do this ?
thanks :)
Upvotes: 0
Views: 325
Reputation: 75715
It's a common problem.
Firstly, you have to understand how beam works. When you prepare your pipeline, you are on the main server there is all your code, all your files. The pipeline is built (Yes, your pipeline is compiled/translated in Java for efficiency reason, Python is too slow (if the new runner is already deployed, your code is compiled in C++, but anyway, the Python disappear at runtime) and ship it to the worker servers with the pipeline option.
Then, you should understand the issue: the compiled pipeline and the options are shipped, not your files!
How to solve this?
Because the pipeline option are sent with your compiled pipeline, load your file in the main server (before your pipeline start) and store the content into the pipeline option.
The read it from the option in your transform
Upvotes: 2