SDG
SDG

Reputation: 2342

Filter through files in GCS bucket folder and delete 0 byte files with Dataflow

I am currently trying to delete all the files that are 0 Bytes within a Google Cloud Storage bucket. I want to be able to do this with apache beam and a dataflow runner that will run on a google cloud project. What I have right now is this (I have hidden some details with <***>):

import apache_beam as beam
import apache_beam.io.gcp.gcsfilesystem as gcs
from apache_beam.options.pipeline_options import PipelineOptions

class DetectEmpty(beam.DoFn):
    def process(self, file_path):
        if gfs.size(file_path) == 0:
            yield file_path

def run(argv=None):

    parser = argparse.ArgumentParser()
    parser.add_argument('--input', dest='input', default=<***>, help='<***>')

    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = '<***>'
    google_cloud_options.job_name = '<***>'
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    gfs = gcs.GCSFileSystem(pipeline_options)
    p = beam.Pipeline(options=pipeline_options)

    images = p | 'read directory' >> ReadFromText(known_args.input)
    empty_images = images | 'discover empty files' >> beam.ParDo(DetectEmpty())

    p.run()

Some of my questions are:

Upvotes: 1

Views: 1872

Answers (1)

danielm
danielm

Reputation: 3010

You don't need to actually read the files in order to detect empty ones, you can just use the FileSystem object directly to check the file sizes and delete as needed. The FileMetadata object returned by the match() function includes the size of the files.

Something like

class DeleteEmpty(beam.DoFn):
  def __init__(self, gfs):
    self.gfs = gfs

  def process(self, file_metadata):
    if file_metadata.size_in_bytes == 0:
      gfs.delete([file_metadata.path])

files = p | 'Filenames' >> beam.Create(gfs.match([<directory glob pattern>]).metadata_list)
          | 'Reshuffle' >> beam.Reshuffle() # this allows the downstream code to be parallelized after the Create
          | 'Delete empty files' >> beam.ParDo(DeleteEmpty(gfs))

GCS doesn't really have folders; they are just a convenience added when using the UI or gsutil. When there are no objects in a folder, that folder just doesn't exist. See https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork

Upvotes: 1

Related Questions