Reputation: 2342
I am currently trying to delete all the files that are 0 Bytes within a Google Cloud Storage bucket. I want to be able to do this with apache beam and a dataflow runner that will run on
a google cloud project. What I have right now is this (I have hidden some details with <***>
):
import apache_beam as beam
import apache_beam.io.gcp.gcsfilesystem as gcs
from apache_beam.options.pipeline_options import PipelineOptions
class DetectEmpty(beam.DoFn):
def process(self, file_path):
if gfs.size(file_path) == 0:
yield file_path
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input', dest='input', default=<***>, help='<***>')
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = '<***>'
google_cloud_options.job_name = '<***>'
options.view_as(StandardOptions).runner = 'DataflowRunner'
gfs = gcs.GCSFileSystem(pipeline_options)
p = beam.Pipeline(options=pipeline_options)
images = p | 'read directory' >> ReadFromText(known_args.input)
empty_images = images | 'discover empty files' >> beam.ParDo(DetectEmpty())
p.run()
Some of my questions are:
apache_beam.io.gcp.gcsfilesystem.GCSFileSystem
to the DoFn
?Upvotes: 1
Views: 1872
Reputation: 3010
You don't need to actually read the files in order to detect empty ones, you can just use the FileSystem object directly to check the file sizes and delete as needed. The FileMetadata object returned by the match() function includes the size of the files.
Something like
class DeleteEmpty(beam.DoFn):
def __init__(self, gfs):
self.gfs = gfs
def process(self, file_metadata):
if file_metadata.size_in_bytes == 0:
gfs.delete([file_metadata.path])
files = p | 'Filenames' >> beam.Create(gfs.match([<directory glob pattern>]).metadata_list)
| 'Reshuffle' >> beam.Reshuffle() # this allows the downstream code to be parallelized after the Create
| 'Delete empty files' >> beam.ParDo(DeleteEmpty(gfs))
GCS doesn't really have folders; they are just a convenience added when using the UI or gsutil. When there are no objects in a folder, that folder just doesn't exist. See https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork
Upvotes: 1