Reputation: 515
I am trying to check if a file exist on a remote server, if it does, check if the row count is 0. If the row count is greater than 0, The pipeline should proceed, if not, I want the sensor to keep checking ( the file has date in the name, so next day maybe the new file would be not empty)
Could anyone help shed some light on how to implement this? I am thinking can I use a SFTP sensor from within the python function that checks the rows? If so how could I use a sensor from within another? Many thanks
Upvotes: 0
Views: 606
Reputation: 5537
You can just make a regular sensor that achieves both these tasks, here is an outline for how to implement this, you have to put this file within the plugins folder inside airflow and you can then import and use it as part of a DAG.
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
import requests
import logging
import json
DEFAULT_CONNECTION_DETAILS = { "host": "127.0.0.1", "password": "wololo" }
log = logging.getLogger( __name__ )
class Remote_File_Row_Sensor( BaseSensorOperator ):
@apply_defaults
def __init__( self, file_name, connection_details= DEFAULT_CONNECTION_DETAILS, *args, **kwargs ):
super( Remote_File_Row_Sensor, self ).__init__( *args, **kwargs )
self.connection_details = connection_details
self.file_name = file_name
def poke( self, context ):
connection_details = self.connection_details
file_name = self.file_name
ROW_COUNT = 0
# Your code here to connect using SFTP and read the file for the row count
if ROW_COUNT == 0:
return False
else:
return True
class Remote_File_Row_Plugin( AirflowPlugin ):
name = "remote_file_row_sensor"
operators = [ Remote_File_Row_Sensor ]
Upvotes: 1