Reputation: 1
I’m trying to make an HTTPS call from an Airflow DAG. I've set up the connection with the appropriate Connection ID, Connection Type, and Host (URL). When I run the following command from the terminal, the DAG executes successfully:
airflow tasks test random_user_api_call get_random_user 2025-2-27
Output:
sushant@MacBook-Air airflow % airflow tasks test random_user_api_call get_random_user 2025-2-27
[2025-02-27T11:35:34.371+0530] {dagbag.py:588} INFO - Filling up the DagBag from /Users/sushant/airflow/dags
[2025-02-27T11:35:34.512+0530] {example_local_kubernetes_executor.py:40} WARNING - Could not import DAGs in example_local_kubernetes_executor.py
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/airflow/example_dags/example_local_kubernetes_executor.py", line 38, in <module>
from kubernetes.client import models as k8s
ModuleNotFoundError: No module named 'kubernetes'
[2025-02-27T11:35:34.512+0530] {example_local_kubernetes_executor.py:41} WARNING - Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]
[2025-02-27T11:35:36.862+0530] {example_kubernetes_executor.py:39} WARNING - The example_kubernetes_executor example DAG requires the kubernetes provider. Please install it with: pip install apache-airflow[cncf.kubernetes]
[2025-02-27T11:35:36.972+0530] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: random_user_api_call.get_random_user __airflow_temporary_run_2025-02-27T06:05:36.938577+00:00__ [None]>
[2025-02-27T11:35:36.975+0530] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: random_user_api_call.get_random_user __airflow_temporary_run_2025-02-27T06:05:36.938577+00:00__ [None]>
[2025-02-27T11:35:36.975+0530] {taskinstance.py:2867} INFO - Starting attempt 0 of 1
[2025-02-27T11:35:36.975+0530] {taskinstance.py:2948} WARNING - cannot record queued_duration for task get_random_user because previous state change time has not been saved
[2025-02-27T11:35:36.976+0530] {taskinstance.py:2890} INFO - Executing <Task(HttpOperator): get_random_user> on 2025-02-27 00:00:00+00:00
[2025-02-27T11:35:37.002+0530] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='random_user_api_call' AIRFLOW_CTX_TASK_ID='get_random_user' AIRFLOW_CTX_EXECUTION_DATE='2025-02-27T00:00:00+00:00' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2025-02-27T06:05:36.938577+00:00__'
Task instance is in running state
Previous state of the Task instance: queued
Current task name:get_random_user state:None start_date:None
Dag name:random_user_api_call and current dag run status:queued
[2025-02-27T11:35:37.009+0530] {taskinstance.py:732} INFO - ::endgroup::
[2025-02-27T11:35:37.009+0530] {http.py:175} INFO - Calling HTTP method
[2025-02-27T11:35:37.027+0530] {base.py:84} INFO - Retrieving connection 'random_user_api'
[2025-02-27T11:35:37.031+0530] {base.py:84} INFO - Retrieving connection 'random_user_api'
[2025-02-27T11:35:37.791+0530] {http.py:236} INFO - {"results":[{"gender":"female","name":{"title":"Miss","first":"Dunja","last":"Jović"},"location":{"street":{"number":572,"name":"Afi Ekong"},"city":"Kragujevac","state":"South Bačka","country":"Serbia","postcode":52376,"coordinates":{"latitude":"56.6092","longitude":"100.3431"},"timezone":{"offset":"-2:00","description":"Mid-Atlantic"}},"email":"[email protected]","login":{"uuid":"3405005c-782d-43d0-8324-308b2f0b0d91","username":"crazyduck107","password":"br549","salt":"Fk2oZaam","md5":"2eac05c54a21b44a73051dcd2660b6b7","sha1":"298592b9ee1d14f892dfff1c4e18c1176f1ec960","sha256":"05244250832a9f3d683961520a793ae72d8c233e722d253b53cad6deaa8b8bb5"},"dob":{"date":"1975-10-15T04:25:17.267Z","age":49},"registered":{"date":"2016-06-07T20:49:12.478Z","age":8},"phone":"030-7086-121","cell":"066-4610-068","id":{"name":"SID","value":"949182649"},"picture":{"large":"https://randomuser.me/api/portraits/women/16.jpg","medium":"https://randomuser.me/api/portraits/med/women/16.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/women/16.jpg"},"nat":"RS"}],"info":{"seed":"3e2262ae2bbd0e2a","results":1,"page":1,"version":"1.4"}}
[2025-02-27T11:35:37.811+0530] {taskinstance.py:341} INFO - ::group::Post task execution logs
[2025-02-27T11:35:37.812+0530] {taskinstance.py:353} INFO - Marking task as SUCCESS. dag_id=random_user_api_call, task_id=get_random_user, run_id=__airflow_temporary_run_2025-02-27T06:05:36.938577+00:00__, execution_date=20250227T000000, start_date=, end_date=20250227T060537
sushant@MacBook-Air airflow % airflow tasks test random_user_api_call get_random_user 2025-2-27
Problem
However, when I trigger the DAG from the Airflow UI (http://localhost:8080/home), it fails with the following logs:
[2025-02-27T13:29:48.158+0530] {local_task_job_runner.py:123} INFO - ::group::Pre task execution logs [2025-02-27T13:29:48.171+0530] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: random_user_api_call.get_random_user manual__2025-02-27T07:59:36.087793+00:00 [queued]> [2025-02-27T13:29:48.205+0530] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: random_user_api_call.get_random_user manual__2025-02-27T07:59:36.087793+00:00 [queued]> [2025-02-27T13:29:48.206+0530] {taskinstance.py:2867} INFO - Starting attempt 1 of 1 [2025-02-27T13:29:48.265+0530] {taskinstance.py:2890} INFO - Executing <Task(HttpOperator): get_random_user> on 2025-02-27 07:59:36.087793+00:00 [2025-02-27T13:29:48.269+0530] {standard_task_runner.py:72} INFO - Started process 84466 to run task [2025-02-27T13:29:48.274+0530] {standard_task_runner.py:104} INFO - Running: ['airflow', 'tasks', 'run', 'random_user_api_call', 'get_random_user', 'manual__2025-02-27T07:59:36.087793+00:00', '--job-id', '45', '--raw', '--subdir', 'DAGS_FOLDER/user_processing.py', '--cfg-path', '/var/folders/r9/n9zyb5kn3c99xp4pqs__w8jc0000gn/T/tmperb0s6fj'] [2025-02-27T13:29:48.275+0530] {standard_task_runner.py:105} INFO - Job 45: Subtask get_random_user [2025-02-27T13:29:48.306+0530] {task_command.py:467} INFO - Running <TaskInstance: random_user_api_call.get_random_user manual__2025-02-27T07:59:36.087793+00:00 [running]> on host macbook-air.local [2025-02-27T13:29:48.351+0530] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='random_user_api_call' AIRFLOW_CTX_TASK_ID='get_random_user' AIRFLOW_CTX_EXECUTION_DATE='2025-02-27T07:59:36.087793+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-02-27T07:59:36.087793+00:00' [2025-02-27T13:29:48.352+0530] {logging_mixin.py:190} INFO - Task instance is in running state [2025-02-27T13:29:48.352+0530] {logging_mixin.py:190} INFO - Previous state of the Task instance: queued [2025-02-27T13:29:48.352+0530] {logging_mixin.py:190} INFO - Current task name:get_random_user state:running start_date:2025-02-27 07:59:48.171863+00:00 [2025-02-27T13:29:48.353+0530] {logging_mixin.py:190} INFO - Dag name:random_user_api_call and current dag run status:running [2025-02-27T13:29:48.353+0530] {taskinstance.py:732} INFO - ::endgroup:: [2025-02-27T13:29:48.353+0530] {http.py:175} INFO - Calling HTTP method [2025-02-27T13:29:48.357+0530] {base.py:84} INFO - Retrieving connection 'random_user_api' [2025-02-27T13:29:48.360+0530] {base.py:84} INFO - Retrieving connection 'random_user_api'
My code:
from airflow import DAG
from airflow.providers.http.operators.http import HttpOperator
from airflow.utils.dates import days_ago
import json
# Define the DAG
dag = DAG(
'random_user_api_call', # Name of the DAGair
default_args={
'owner': 'airflow',
},
description='A simple DAG to fetch random user data via API',
schedule_interval=None, # You can set a schedule if you want it to run periodically
start_date=days_ago(1),
)
# Define the GET request task
get_random_user = HttpOperator(
task_id='get_random_user',
method='GET',
http_conn_id='random_user_api', # We will define the connection below
endpoint='/api', # The API endpoint
response_filter=lambda response: response.json(), # Parse response as JSON
log_response=True,
dag=dag,
)
# Define task flow
get_random_user
# Define the connection details
# CONNECTION_ID = "user_api"
# CONNECTION_TYPE = "http"
# CONNECTION_HOST = "https://randomuser.me"
# CONNECTION_PORT = None
I have tried this on local device but it has failed
I want the response in airflow UI.
Environment:
Upvotes: 0
Views: 10