user3474606
user3474606

Reputation: 515

Airflow- how to use sensor from another sensor task

I am trying to check if a file exist on a remote server, if it does, check if the row count is 0. If the row count is greater than 0, The pipeline should proceed, if not, I want the sensor to keep checking ( the file has date in the name, so next day maybe the new file would be not empty)

Could anyone help shed some light on how to implement this? I am thinking can I use a SFTP sensor from within the python function that checks the rows? If so how could I use a sensor from within another? Many thanks

Upvotes: 0

Views: 606

Answers (1)

Meghdeep Ray
Meghdeep Ray

Reputation: 5537

You can just make a regular sensor that achieves both these tasks, here is an outline for how to implement this, you have to put this file within the plugins folder inside airflow and you can then import and use it as part of a DAG.

from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators  import apply_defaults
from airflow.plugins_manager   import AirflowPlugin

import requests
import logging
import json

DEFAULT_CONNECTION_DETAILS = { "host": "127.0.0.1", "password": "wololo" }

log = logging.getLogger( __name__ )

class Remote_File_Row_Sensor( BaseSensorOperator ):

    @apply_defaults
    def __init__( self, file_name, connection_details= DEFAULT_CONNECTION_DETAILS, *args, **kwargs ):
        super( Remote_File_Row_Sensor, self ).__init__( *args, **kwargs )
        self.connection_details = connection_details
        self.file_name          = file_name

    def poke( self, context ):
        connection_details   = self.connection_details
        file_name            = self.file_name

        ROW_COUNT = 0

        # Your code here to connect using SFTP and read the file for the row count

        if ROW_COUNT == 0:
            return False
        else:
            return True

class Remote_File_Row_Plugin( AirflowPlugin ):
    name      = "remote_file_row_sensor"
    operators = [ Remote_File_Row_Sensor ]

Upvotes: 1

Related Questions