Raul
Raul

Reputation: 35

How to sense multiple files using Airflow FileSensor?

I am creating a dag. I am trying to sense multiple files from my local folder but not able to sense all files present in the folder. what should be the correct logic?

sensing_task=FileSensor(task_id='senseFile',
                        filepath='dags/source/*.csv',
                        poke_interval=10,
                        fs_conn_id='fs_my_conn')

while executing it is actually searching for "*.csv" file and not the pattern.

Upvotes: 2

Views: 2573

Answers (2)

Scott Luisi
Scott Luisi

Reputation: 11

Not sure what pattern you are referring to in your comment: "it is actually searching for "*.csv" file and not the pattern."

The filepath is set to search for ".csv" filepath='dags/source/.csv'

I had the same requirement. My solution was to use the FileSensor as you did, and then create a 2nd task using the BashOperator to run a bash script to process 1 or more files. I used a for loop to process all new files and then a case stmt to perform the logic associated with each filename pattern.

Upvotes: 0

flakes
flakes

Reputation: 23684

Looks like you probably need to define your own sensor. Looking at the source code for filesensor, you could probably do something like this to implement globbing.

class GlobSensor(BaseSensorOperator):
    template_fields = ('filepath',)
    ui_color = '#91818a'

    @apply_defaults
    def __init__(self, filepath, fs_conn_id='fs_default', *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.filepath = filepath
        self.fs_conn_id = fs_conn_id

    def poke(self, context):
        hook = FSHook(self.fs_conn_id)
        basepath = hook.get_path()
        full_path = os.path.join(basepath, self.filepath)
        self.log.info('Poking for glob %s', full_path)
        return len(glob.glob(full_path)) > 0

Upvotes: 1

Related Questions