Kimor
Kimor

Reputation: 602

Dynamically access file in dataflow pipeline

I am using a dataflow pipeline and I'd like to be able to access files on it.

this is the line where I call method to get files and extract modules:

def run():
    """ Run pipeline"""
    options: PipelineOptions = PipelineOptions(
        project='production-213911',
        runner='DataflowRunner',
        region='europe-west1',
        streaming=True,
        setup_file='dataflow/setup.py',
        autoscaling_algorith='THROUGHPUT_BASED',
    )
    proto = Container().protobuf()
    test = proto.get_proto_file('data/build')    
    proto.get_obj_from_file(test)

    with beam.Pipeline(options=options) as pipeline:
        ...

this is the pipeline step where I'd like to use my module's list:

        status_records = (status | 'Proto to Dict' >> beam.Map(
        lambda x: convert_proto_to_dict(x, proto.protos)))

and here is the code that browse the directory to get files:

@staticmethod
def get_proto_file(dirname: str = 'python_protocol') -> List[object]:
    """Iterate threw dir w. build .proto
    :param
        dirname = name of the directory to browse
    :return
        List[object] with module
    """
    protos: List[object] = []
    for root, _, mod in os.walk(dirname):
        for name in mod:
            if 'pb2' in name and 'pyc' not in name:
                print(name)
                module = name[:-3]
                if '/' in root:
                    dirname = root.replace('/', '.')
                path = f'{dirname}.{module}'
                imported_module = importlib.import_module(path)
                protos.append(imported_module)
    return protos

However my 'proto.protos' variable is always set to None (meaning test is None, I am sure the problem is on the first step)

I tried to call this line from a file on the same level as my pipeline and it works:

test = proto.get_proto_file('data/build')

So I am guessing this is because my files are not in dataflow as they are in my project.. Any idea how to do this ?

thanks :)

Upvotes: 0

Views: 325

Answers (1)

guillaume blaquiere
guillaume blaquiere

Reputation: 75715

It's a common problem.

  • Firstly, you have to understand how beam works. When you prepare your pipeline, you are on the main server there is all your code, all your files. The pipeline is built (Yes, your pipeline is compiled/translated in Java for efficiency reason, Python is too slow (if the new runner is already deployed, your code is compiled in C++, but anyway, the Python disappear at runtime) and ship it to the worker servers with the pipeline option.

  • Then, you should understand the issue: the compiled pipeline and the options are shipped, not your files!

How to solve this?

Because the pipeline option are sent with your compiled pipeline, load your file in the main server (before your pipeline start) and store the content into the pipeline option.

The read it from the option in your transform

Upvotes: 2

Related Questions