Reputation: 7276
Our airflow implementation sends out http requests to get services to do tasks. We want those services to let airflow know when they complete their task, so we are sending a callback url to the service which they will call when their task is complete. I can't seem to find a callback sensor, however. How do people handle this normally?
Upvotes: 6
Views: 8610
Reputation: 1418
There is no such thing as a callback or webhook sensor in Airflow. The sensor definition follows as taken from the documentation:
Sensors are a certain type of operator that will keep running until a certain criterion is met. Examples include a specific file landing in HDFS or S3, a partition appearing in Hive, or a specific time of the day. Sensors are derived from BaseSensorOperator and run a poke method at a specified poke_interval until it returns True.
This means that a sensor is an operator that performs polling behavior on external systems. In that sense, your external services should have a way of keeping state for each executed task - either internally or externally - so that a polling sensor can check on that state.
This way you can use for example the airflow.operators.HttpSensor that polls an HTTP endpoint until a condition is met. Or even better, write your own custom sensor that gives you the opportunity to do more complex processing and keep state.
Otherwise, if the service outputs data in a storage system you can use a sensor that polls a database for example. I believe you get the idea.
I'm attaching a custom operator example that I've written for integrating with the Apache Livy API. The sensor does two things: a) submits a Spark job through the REST API and b) waits for the job to be completed.
The operator extends the SimpleHttpOperator and at the same time implements the HttpSensor thus combining both functionalities.
class LivyBatchOperator(SimpleHttpOperator):
"""
Submits a new Spark batch job through
the Apache Livy REST API.
"""
template_fields = ('args',)
ui_color = '#f4a460'
@apply_defaults
def __init__(self,
name,
className,
file,
executorMemory='1g',
driverMemory='512m',
driverCores=1,
executorCores=1,
numExecutors=1,
args=[],
conf={},
timeout=120,
http_conn_id='apache_livy',
*arguments, **kwargs):
"""
If xcom_push is True, response of an HTTP request will also
be pushed to an XCom.
"""
super(LivyBatchOperator, self).__init__(
endpoint='batches', *arguments, **kwargs)
self.http_conn_id = http_conn_id
self.method = 'POST'
self.endpoint = 'batches'
self.name = name
self.className = className
self.file = file
self.executorMemory = executorMemory
self.driverMemory = driverMemory
self.driverCores = driverCores
self.executorCores = executorCores
self.numExecutors = numExecutors
self.args = args
self.conf = conf
self.timeout = timeout
self.poke_interval = 10
def execute(self, context):
"""
Executes the task
"""
payload = {
"name": self.name,
"className": self.className,
"executorMemory": self.executorMemory,
"driverMemory": self.driverMemory,
"driverCores": self.driverCores,
"executorCores": self.executorCores,
"numExecutors": self.numExecutors,
"file": self.file,
"args": self.args,
"conf": self.conf
}
print payload
headers = {
'X-Requested-By': 'airflow',
'Content-Type': 'application/json'
}
http = HttpHook(self.method, http_conn_id=self.http_conn_id)
self.log.info("Submitting batch through Apache Livy API")
response = http.run(self.endpoint,
json.dumps(payload),
headers,
self.extra_options)
# parse the JSON response
obj = json.loads(response.content)
# get the new batch Id
self.batch_id = obj['id']
log.info('Batch successfully submitted with Id %s', self.batch_id)
# start polling the batch status
started_at = datetime.utcnow()
while not self.poke(context):
if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
raise AirflowSensorTimeout('Snap. Time is OUT.')
sleep(self.poke_interval)
self.log.info("Batch %s has finished", self.batch_id)
def poke(self, context):
'''
Function that the sensors defined while deriving this class should
override.
'''
http = HttpHook(method='GET', http_conn_id=self.http_conn_id)
self.log.info("Calling Apache Livy API to get batch status")
# call the API endpoint
endpoint = 'batches/' + str(self.batch_id)
response = http.run(endpoint)
# parse the JSON response
obj = json.loads(response.content)
# get the current state of the batch
state = obj['state']
# check the batch state
if (state == 'starting') or (state == 'running'):
# if state is 'starting' or 'running'
# signal a new polling cycle
self.log.info('Batch %s has not finished yet (%s)',
self.batch_id, state)
return False
elif state == 'success':
# if state is 'success' exit
return True
else:
# for all other states
# raise an exception and
# terminate the task
raise AirflowException(
'Batch ' + str(self.batch_id) + ' failed (' + state + ')')
Hope this will help you a bit.
Upvotes: 17