melchoir55
melchoir55

Reputation: 7276

airflow http callback sensor

Our airflow implementation sends out http requests to get services to do tasks. We want those services to let airflow know when they complete their task, so we are sending a callback url to the service which they will call when their task is complete. I can't seem to find a callback sensor, however. How do people handle this normally?

Upvotes: 6

Views: 8610

Answers (1)

chrysanthos
chrysanthos

Reputation: 1418

There is no such thing as a callback or webhook sensor in Airflow. The sensor definition follows as taken from the documentation:

Sensors are a certain type of operator that will keep running until a certain criterion is met. Examples include a specific file landing in HDFS or S3, a partition appearing in Hive, or a specific time of the day. Sensors are derived from BaseSensorOperator and run a poke method at a specified poke_interval until it returns True.

This means that a sensor is an operator that performs polling behavior on external systems. In that sense, your external services should have a way of keeping state for each executed task - either internally or externally - so that a polling sensor can check on that state.

This way you can use for example the airflow.operators.HttpSensor that polls an HTTP endpoint until a condition is met. Or even better, write your own custom sensor that gives you the opportunity to do more complex processing and keep state.

Otherwise, if the service outputs data in a storage system you can use a sensor that polls a database for example. I believe you get the idea.

I'm attaching a custom operator example that I've written for integrating with the Apache Livy API. The sensor does two things: a) submits a Spark job through the REST API and b) waits for the job to be completed.

The operator extends the SimpleHttpOperator and at the same time implements the HttpSensor thus combining both functionalities.

class LivyBatchOperator(SimpleHttpOperator):
"""
Submits a new Spark batch job through
the Apache Livy REST API.

"""

template_fields = ('args',)
ui_color = '#f4a460'

@apply_defaults
def __init__(self,
             name,
             className,
             file,
             executorMemory='1g',
             driverMemory='512m',
             driverCores=1,
             executorCores=1,
             numExecutors=1,
             args=[],
             conf={},
             timeout=120,
             http_conn_id='apache_livy',
             *arguments, **kwargs):
    """
    If xcom_push is True, response of an HTTP request will also
    be pushed to an XCom.
    """
    super(LivyBatchOperator, self).__init__(
        endpoint='batches', *arguments, **kwargs)

    self.http_conn_id = http_conn_id
    self.method = 'POST'
    self.endpoint = 'batches'
    self.name = name
    self.className = className
    self.file = file
    self.executorMemory = executorMemory
    self.driverMemory = driverMemory
    self.driverCores = driverCores
    self.executorCores = executorCores
    self.numExecutors = numExecutors
    self.args = args
    self.conf = conf
    self.timeout = timeout
    self.poke_interval = 10

def execute(self, context):
    """
    Executes the task
    """

    payload = {
        "name": self.name,
        "className": self.className,
        "executorMemory": self.executorMemory,
        "driverMemory": self.driverMemory,
        "driverCores": self.driverCores,
        "executorCores": self.executorCores,
        "numExecutors": self.numExecutors,
        "file": self.file,
        "args": self.args,
        "conf": self.conf
    }
    print payload
    headers = {
        'X-Requested-By': 'airflow',
        'Content-Type': 'application/json'
    }

    http = HttpHook(self.method, http_conn_id=self.http_conn_id)

    self.log.info("Submitting batch through Apache Livy API")

    response = http.run(self.endpoint,
                        json.dumps(payload),
                        headers,
                        self.extra_options)

    # parse the JSON response
    obj = json.loads(response.content)

    # get the new batch Id
    self.batch_id = obj['id']

    log.info('Batch successfully submitted with Id %s', self.batch_id)

    # start polling the batch status
    started_at = datetime.utcnow()
    while not self.poke(context):
        if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
            raise AirflowSensorTimeout('Snap. Time is OUT.')

        sleep(self.poke_interval)

    self.log.info("Batch %s has finished", self.batch_id)

def poke(self, context):
    '''
    Function that the sensors defined while deriving this class should
    override.
    '''

    http = HttpHook(method='GET', http_conn_id=self.http_conn_id)

    self.log.info("Calling Apache Livy API to get batch status")

    # call the API endpoint
    endpoint = 'batches/' + str(self.batch_id)
    response = http.run(endpoint)

    # parse the JSON response
    obj = json.loads(response.content)

    # get the current state of the batch
    state = obj['state']

    # check the batch state
    if (state == 'starting') or (state == 'running'):
        # if state is 'starting' or 'running'
        # signal a new polling cycle
        self.log.info('Batch %s has not finished yet (%s)',
                      self.batch_id, state)
        return False
    elif state == 'success':
        # if state is 'success' exit
        return True
    else:
        # for all other states
        # raise an exception and
        # terminate the task
        raise AirflowException(
            'Batch ' + str(self.batch_id) + ' failed (' + state + ')')

Hope this will help you a bit.

Upvotes: 17

Related Questions