user3777228
user3777228

Reputation: 159

Access File inside the dataflow pipeline

I want to download certain files from to the temp location before the pipeline starts.The files .mmdb files which are to be read in the ParDo fucntion.The files are stored on Google Storage but the method consuming the .mmdb files requires them to be a File(java.io) object.

If I include it in --filesToStage ,they are available as InputStream inside the zip . I want to access them as files not InputStream. What is the best way to achieve this?

I am currently downloading the files in a temporary folder on the worker inside the Setup of the ParDo .

Upvotes: 0

Views: 1489

Answers (2)

Rob Knights
Rob Knights

Reputation: 247

I think I understand what you are/were trying to do and I was looking to do the same.

This worked for me (in the setup() method of the DoFn):

 if(not FileSystems.exists(local_db_location) ):
        with FileSystems.open(  self._cloud_database_loc ) as af:
            with FileSystems.create(local_db_location) as local_file:
                try:
                    shutil.copyfileobj(af,local_file,length=131072)
                except:
                    raise
    else:
        #DB exists

Upvotes: 0

Anton
Anton

Reputation: 2539

This is a very broad and high level question. The answer depends on your logic that consumes the files. File represents a file on a filesystem so if you have a component that requires the input to be an instance of File then it is a correct thing to write it to a temp folder locally. Beam doesn't provide a better abstraction for this case.

However I would recommend to look into updating the logic that currently handles Files to accept other kinds of input as well. You likely hit the issue caused by the lack of separation of concerns and tight coupling. That is you have a component that takes in a File, opens it, deals with errors while opening it, reads it, parses data from it, maybe even validates and processes the data. All of these are separate concerns and probably should be handled by separate components that you can combine and replace together when needed, for example:

  • a class that knows how to deal with a filesystem and turn a path into a byte stream;
  • similar class that knows how to deal with getting a file over http (e.g. GCS use case) and turn it into a byte stream;
  • a component that knows how to parse the byte stream into data;
  • a component that processes the parsed data;
  • other things can probably live anywhere;

This way you can easily implement any other sources for your component, compose and test them independently.

For example, you could implement your logic as 2 joined PCollections, one of which would read from the GCS location directly, parse the text lines, and and process it in the actual business logic before joining it with the other PCollection.

Upvotes: 2

Related Questions