Reputation: 159
I want to download certain files from to the temp location before the pipeline starts.The files .mmdb files which are to be read in the ParDo fucntion.The files are stored on Google Storage but the method consuming the .mmdb files requires them to be a File(java.io) object.
If I include it in --filesToStage ,they are available as InputStream inside the zip . I want to access them as files not InputStream. What is the best way to achieve this?
I am currently downloading the files in a temporary folder on the worker inside the Setup of the ParDo .
Upvotes: 0
Views: 1489
Reputation: 247
I think I understand what you are/were trying to do and I was looking to do the same.
This worked for me (in the setup() method of the DoFn):
if(not FileSystems.exists(local_db_location) ):
with FileSystems.open( self._cloud_database_loc ) as af:
with FileSystems.create(local_db_location) as local_file:
try:
shutil.copyfileobj(af,local_file,length=131072)
except:
raise
else:
#DB exists
Upvotes: 0
Reputation: 2539
This is a very broad and high level question. The answer depends on your logic that consumes the files. File
represents a file on a filesystem so if you have a component that requires the input to be an instance of File
then it is a correct thing to write it to a temp folder locally. Beam doesn't provide a better abstraction for this case.
However I would recommend to look into updating the logic that currently handles Files
to accept other kinds of input as well. You likely hit the issue caused by the lack of separation of concerns and tight coupling. That is you have a component that takes in a File
, opens it, deals with errors while opening it, reads it, parses data from it, maybe even validates and processes the data. All of these are separate concerns and probably should be handled by separate components that you can combine and replace together when needed, for example:
This way you can easily implement any other sources for your component, compose and test them independently.
For example, you could implement your logic as 2 joined PCollections
, one of which would read from the GCS location directly, parse the text lines, and and process it in the actual business logic before joining it with the other PCollection
.
Upvotes: 2