Reputation: 35
I am creating a dag. I am trying to sense multiple files from my local folder but not able to sense all files present in the folder. what should be the correct logic?
sensing_task=FileSensor(task_id='senseFile',
filepath='dags/source/*.csv',
poke_interval=10,
fs_conn_id='fs_my_conn')
while executing it is actually searching for "*.csv" file and not the pattern.
Upvotes: 2
Views: 2573
Reputation: 11
Not sure what pattern you are referring to in your comment: "it is actually searching for "*.csv" file and not the pattern."
The filepath is set to search for ".csv" filepath='dags/source/.csv'
I had the same requirement. My solution was to use the FileSensor as you did, and then create a 2nd task using the BashOperator to run a bash script to process 1 or more files. I used a for loop to process all new files and then a case stmt to perform the logic associated with each filename pattern.
Upvotes: 0
Reputation: 23684
Looks like you probably need to define your own sensor. Looking at the source code for filesensor, you could probably do something like this to implement globbing.
class GlobSensor(BaseSensorOperator):
template_fields = ('filepath',)
ui_color = '#91818a'
@apply_defaults
def __init__(self, filepath, fs_conn_id='fs_default', *args, **kwargs):
super().__init__(*args, **kwargs)
self.filepath = filepath
self.fs_conn_id = fs_conn_id
def poke(self, context):
hook = FSHook(self.fs_conn_id)
basepath = hook.get_path()
full_path = os.path.join(basepath, self.filepath)
self.log.info('Poking for glob %s', full_path)
return len(glob.glob(full_path)) > 0
Upvotes: 1