Sushant Kumar
Sushant Kumar

Reputation: 1

Airflow DAG Failing in UI but Succeeding via CLI

I’m trying to make an HTTPS call from an Airflow DAG. I've set up the connection with the appropriate Connection ID, Connection Type, and Host (URL). When I run the following command from the terminal, the DAG executes successfully:

airflow tasks test random_user_api_call get_random_user 2025-2-27

Output:

sushant@MacBook-Air airflow % airflow tasks test random_user_api_call get_random_user 2025-2-27
[2025-02-27T11:35:34.371+0530] {dagbag.py:588} INFO - Filling up the DagBag from /Users/sushant/airflow/dags
[2025-02-27T11:35:34.512+0530] {example_local_kubernetes_executor.py:40} WARNING - Could not import DAGs in example_local_kubernetes_executor.py
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/airflow/example_dags/example_local_kubernetes_executor.py", line 38, in <module>
    from kubernetes.client import models as k8s
ModuleNotFoundError: No module named 'kubernetes'
[2025-02-27T11:35:34.512+0530] {example_local_kubernetes_executor.py:41} WARNING - Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]
[2025-02-27T11:35:36.862+0530] {example_kubernetes_executor.py:39} WARNING - The example_kubernetes_executor example DAG requires the kubernetes provider. Please install it with: pip install apache-airflow[cncf.kubernetes]
[2025-02-27T11:35:36.972+0530] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: random_user_api_call.get_random_user __airflow_temporary_run_2025-02-27T06:05:36.938577+00:00__ [None]>
[2025-02-27T11:35:36.975+0530] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: random_user_api_call.get_random_user __airflow_temporary_run_2025-02-27T06:05:36.938577+00:00__ [None]>
[2025-02-27T11:35:36.975+0530] {taskinstance.py:2867} INFO - Starting attempt 0 of 1
[2025-02-27T11:35:36.975+0530] {taskinstance.py:2948} WARNING - cannot record queued_duration for task get_random_user because previous state change time has not been saved
[2025-02-27T11:35:36.976+0530] {taskinstance.py:2890} INFO - Executing <Task(HttpOperator): get_random_user> on 2025-02-27 00:00:00+00:00
[2025-02-27T11:35:37.002+0530] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='random_user_api_call' AIRFLOW_CTX_TASK_ID='get_random_user' AIRFLOW_CTX_EXECUTION_DATE='2025-02-27T00:00:00+00:00' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2025-02-27T06:05:36.938577+00:00__'
Task instance is in running state
 Previous state of the Task instance: queued
Current task name:get_random_user state:None start_date:None
Dag name:random_user_api_call and current dag run status:queued
[2025-02-27T11:35:37.009+0530] {taskinstance.py:732} INFO - ::endgroup::
[2025-02-27T11:35:37.009+0530] {http.py:175} INFO - Calling HTTP method
[2025-02-27T11:35:37.027+0530] {base.py:84} INFO - Retrieving connection 'random_user_api'
[2025-02-27T11:35:37.031+0530] {base.py:84} INFO - Retrieving connection 'random_user_api'
[2025-02-27T11:35:37.791+0530] {http.py:236} INFO - {"results":[{"gender":"female","name":{"title":"Miss","first":"Dunja","last":"Jović"},"location":{"street":{"number":572,"name":"Afi Ekong"},"city":"Kragujevac","state":"South Bačka","country":"Serbia","postcode":52376,"coordinates":{"latitude":"56.6092","longitude":"100.3431"},"timezone":{"offset":"-2:00","description":"Mid-Atlantic"}},"email":"[email protected]","login":{"uuid":"3405005c-782d-43d0-8324-308b2f0b0d91","username":"crazyduck107","password":"br549","salt":"Fk2oZaam","md5":"2eac05c54a21b44a73051dcd2660b6b7","sha1":"298592b9ee1d14f892dfff1c4e18c1176f1ec960","sha256":"05244250832a9f3d683961520a793ae72d8c233e722d253b53cad6deaa8b8bb5"},"dob":{"date":"1975-10-15T04:25:17.267Z","age":49},"registered":{"date":"2016-06-07T20:49:12.478Z","age":8},"phone":"030-7086-121","cell":"066-4610-068","id":{"name":"SID","value":"949182649"},"picture":{"large":"https://randomuser.me/api/portraits/women/16.jpg","medium":"https://randomuser.me/api/portraits/med/women/16.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/women/16.jpg"},"nat":"RS"}],"info":{"seed":"3e2262ae2bbd0e2a","results":1,"page":1,"version":"1.4"}}
[2025-02-27T11:35:37.811+0530] {taskinstance.py:341} INFO - ::group::Post task execution logs
[2025-02-27T11:35:37.812+0530] {taskinstance.py:353} INFO - Marking task as SUCCESS. dag_id=random_user_api_call, task_id=get_random_user, run_id=__airflow_temporary_run_2025-02-27T06:05:36.938577+00:00__, execution_date=20250227T000000, start_date=, end_date=20250227T060537
sushant@MacBook-Air airflow % airflow tasks test random_user_api_call get_random_user 2025-2-27

Problem

However, when I trigger the DAG from the Airflow UI (http://localhost:8080/home), it fails with the following logs:

[2025-02-27T13:29:48.158+0530] {local_task_job_runner.py:123} INFO - ::group::Pre task execution logs [2025-02-27T13:29:48.171+0530] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: random_user_api_call.get_random_user manual__2025-02-27T07:59:36.087793+00:00 [queued]> [2025-02-27T13:29:48.205+0530] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: random_user_api_call.get_random_user manual__2025-02-27T07:59:36.087793+00:00 [queued]> [2025-02-27T13:29:48.206+0530] {taskinstance.py:2867} INFO - Starting attempt 1 of 1 [2025-02-27T13:29:48.265+0530] {taskinstance.py:2890} INFO - Executing <Task(HttpOperator): get_random_user> on 2025-02-27 07:59:36.087793+00:00 [2025-02-27T13:29:48.269+0530] {standard_task_runner.py:72} INFO - Started process 84466 to run task [2025-02-27T13:29:48.274+0530] {standard_task_runner.py:104} INFO - Running: ['airflow', 'tasks', 'run', 'random_user_api_call', 'get_random_user', 'manual__2025-02-27T07:59:36.087793+00:00', '--job-id', '45', '--raw', '--subdir', 'DAGS_FOLDER/user_processing.py', '--cfg-path', '/var/folders/r9/n9zyb5kn3c99xp4pqs__w8jc0000gn/T/tmperb0s6fj'] [2025-02-27T13:29:48.275+0530] {standard_task_runner.py:105} INFO - Job 45: Subtask get_random_user [2025-02-27T13:29:48.306+0530] {task_command.py:467} INFO - Running <TaskInstance: random_user_api_call.get_random_user manual__2025-02-27T07:59:36.087793+00:00 [running]> on host macbook-air.local [2025-02-27T13:29:48.351+0530] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='random_user_api_call' AIRFLOW_CTX_TASK_ID='get_random_user' AIRFLOW_CTX_EXECUTION_DATE='2025-02-27T07:59:36.087793+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-02-27T07:59:36.087793+00:00' [2025-02-27T13:29:48.352+0530] {logging_mixin.py:190} INFO - Task instance is in running state [2025-02-27T13:29:48.352+0530] {logging_mixin.py:190} INFO -  Previous state of the Task instance: queued [2025-02-27T13:29:48.352+0530] {logging_mixin.py:190} INFO - Current task name:get_random_user state:running start_date:2025-02-27 07:59:48.171863+00:00 [2025-02-27T13:29:48.353+0530] {logging_mixin.py:190} INFO - Dag name:random_user_api_call and current dag run status:running [2025-02-27T13:29:48.353+0530] {taskinstance.py:732} INFO - ::endgroup:: [2025-02-27T13:29:48.353+0530] {http.py:175} INFO - Calling HTTP method [2025-02-27T13:29:48.357+0530] {base.py:84} INFO - Retrieving connection 'random_user_api' [2025-02-27T13:29:48.360+0530] {base.py:84} INFO - Retrieving connection 'random_user_api' 

My code:

from airflow import DAG
from airflow.providers.http.operators.http import HttpOperator
from airflow.utils.dates import days_ago
import json

# Define the DAG
dag = DAG(
    'random_user_api_call',  # Name of the DAGair
    default_args={
        'owner': 'airflow',
    },
    description='A simple DAG to fetch random user data via API',
    schedule_interval=None,  # You can set a schedule if you want it to run periodically
    start_date=days_ago(1),
)

# Define the GET request task
get_random_user = HttpOperator(
    task_id='get_random_user',
    method='GET',
    http_conn_id='random_user_api',  # We will define the connection below
    endpoint='/api',  # The API endpoint
    response_filter=lambda response: response.json(),  # Parse response as JSON
    log_response=True,
    dag=dag,
)



# Define task flow
get_random_user

 
# Define the connection details
# CONNECTION_ID = "user_api"
# CONNECTION_TYPE = "http"
# CONNECTION_HOST = "https://randomuser.me"
# CONNECTION_PORT = None  

I have tried this on local device but it has failed

I want the response in airflow UI.

Environment:

Upvotes: 0

Views: 10

Answers (0)

Related Questions