Reputation: 1754
I'm trying to use SimpleHttpOperator
to consume a RESTful API. But, as the name suggests, it only supports the HTTP protocol, where I need to consume an HTTPS URI.
Now, I have to use either the requests
object from Python or handle the invocation from within the application code. But this may not be a standard way. So, I'm looking for any other options available to consume HTTPS URI from within Airflow.
Upvotes: 12
Views: 28656
Reputation: 11
I am using Airflow 2.1.0,and the following setting works for https API
In connection UI, setting host name as usual, no need to specify 'https' in schema field, don't forget to set login account and password if your API server request ones. Connection UI Setting
When constructing your task, add extra_options parameter in SimpleHttpOperator, and put your CA_bundle certification file path as the value for key verify, if you don't have a certification file, then use false to skip verification. Task definition
Reference: here
Upvotes: 1
Reputation: 4472
In Airflow 2.x you can use https URLs by passing https
for schema value while setting up your connection and can still use SimpleHttpOperator
like shown below.
my_api = SimpleHttpOperator(
task_id="my_api",
http_conn_id="YOUR_CONN_ID",
method="POST",
endpoint="/base-path/end-point",
data=get_data,
headers={"Content-Type": "application/json"},
)
Upvotes: 6
Reputation: 1177
in Airflow 2, the problem is been resolved. just check out that :
host
name in Connection UI Form, don't end up with /
SimpleHttpOperator
starts with /
Upvotes: 0
Reputation: 71
I was having the same problem with HTTP/HTTPS when trying to set the connections using environment variables (although it works when i set the connection on the UI).
I've checked the issue @melchoir55 opened (https://issues.apache.org/jira/browse/AIRFLOW-2910) and you don't need to make a custom operator for that, the problem is not that HttpHook or HttpOperator can't use HTTPS, the problem is the way get_hook parse the connection string when dealing with HTTP, it actually understand that the first part (http:// or https://) is the connection type.
In summary, you don't need a custom operator, you can just set the connection in your env as the following:
AIRFLOW_CONN_HTTP_EXAMPLE=http://https%3a%2f%2fexample.com/
Instead of:
AIRFLOW_CONN_HTTP_EXAMPLE=https://example.com/
Or set the connection on the UI.
It is not a intuitive way to set up a connection but I think they are working on a better way to parse connections for Ariflow 2.0.
Upvotes: 4
Reputation: 129
This is a couple of months old now, but for what it is worth I did not have any issue with making an HTTPS call on Airflow 1.10.2.
In my initial test I was making a request for templates from sendgrid, so the connection was set up like this:
Conn Id : sendgrid_templates_test
Conn Type : HTTP
Host : https://api.sendgrid.com/
Extra : { "authorization": "Bearer [my token]"}
and then in the dag code:
get_templates = SimpleHttpOperator(
task_id='get_templates',
method='GET',
endpoint='/v3/templates',
http_conn_id = 'sendgrid_templates_test',
trigger_rule="all_done",
xcom_push=True
dag=dag,
)
and that worked. Also notice that my request happens after a Branch Operator, so I needed to set the trigger rule appropriately (to "all_done" to make sure it fires even when one of the branches is skipped), which has nothing to do with the question, but I just wanted to point it out.
Now to be clear, I did get an Insecure Request warning as I did not have certificate verification enabled. But you can see the resulting logs below
[2019-02-21 16:15:01,333] {http_operator.py:89} INFO - Calling HTTP method
[2019-02-21 16:15:01,336] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,335] {base_hook.py:83} INFO - Using connection to: id: sendgrid_templates_test. Host: https://api.sendgrid.com/, Port: None, Schema: None, Login: None, Password: XXXXXXXX, extra: {'authorization': 'Bearer [my token]'}
[2019-02-21 16:15:01,338] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,337] {http_hook.py:126} INFO - Sending 'GET' to url: https://api.sendgrid.com//v3/templates
[2019-02-21 16:15:01,956] {logging_mixin.py:95} WARNING - /home/csconnell/.pyenv/versions/airflow/lib/python3.6/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
InsecureRequestWarning)
[2019-02-21 16:15:05,242] {logging_mixin.py:95} INFO - [2019-02-21 16:15:05,241] {jobs.py:2527} INFO - Task exited with return code 0
Upvotes: 6
Reputation: 1
Instead of implementing HttpsHook, we could just put one line of codes into HttpsOperator(SimpleHttpOperator)@above as follows
...
self.extra_options['verify'] = True
response = http.run(self.endpoint,
self.data,
self.headers,
self.extra_options)
...
Upvotes: 0
Reputation: 7286
I dove into this and am pretty sure that this behavior is a bug in airflow. I have created a ticket for it here: https://issues.apache.org/jira/browse/AIRFLOW-2910
For now, the best you can do is override SimpleHttpOperator as well as HttpHook in order to change the way that HttpHook.get_conn works (to accept https). I may end up doing this, and if I do I'll post some code.
Update:
Operator override:
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.exceptions import AirflowException
from operators.https_support.https_hook import HttpsHook
class HttpsOperator(SimpleHttpOperator):
def execute(self, context):
http = HttpsHook(self.method, http_conn_id=self.http_conn_id)
self.log.info("Calling HTTP method")
response = http.run(self.endpoint,
self.data,
self.headers,
self.extra_options)
if self.response_check:
if not self.response_check(response):
raise AirflowException("Response check returned False.")
if self.xcom_push_flag:
return response.text
Hook override
from airflow.hooks.http_hook import HttpHook
import requests
class HttpsHook(HttpHook):
def get_conn(self, headers):
"""
Returns http session for use with requests. Supports https.
"""
conn = self.get_connection(self.http_conn_id)
session = requests.Session()
if "://" in conn.host:
self.base_url = conn.host
elif conn.schema:
self.base_url = conn.schema + "://" + conn.host
elif conn.conn_type: # https support
self.base_url = conn.conn_type + "://" + conn.host
else:
# schema defaults to HTTP
self.base_url = "http://" + conn.host
if conn.port:
self.base_url = self.base_url + ":" + str(conn.port) + "/"
if conn.login:
session.auth = (conn.login, conn.password)
if headers:
session.headers.update(headers)
return session
Usage:
Drop-in replacement for SimpleHttpOperator.
Upvotes: 8