Reputation: 384
I am trying to run an Airflow DAG that queries the dag table in the Airflow Postgres database. Here is the code for the DAG:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
default_args = {
'owner': 'airflow',
'depend_on_past': False,
'start_date': datetime(year=2019, month=10, day=1),
'retries': 0
}
def get_dag_table():
query = 'SELECT * FROM dag LIMIT 5;'
hook = PostgresHook(postgre_conn_id='postgres_default',
host='localhost',
database='airflow',
user='airflow',
password='airflow',
port=5432)
connection = hook.get_conn()
# COMMENTED OUT FOR DEBUGGING
# cursor = connection.cursor()
# cursor.execute(request)
# return cursor.fetchall()
dag = DAG(
"custom_postgres_tutorial",
default_args=default_args,
schedule_interval=None
)
start_task = DummyOperator(task_id='start_task', dag=dag)
postgres_task = PythonOperator(task_id='query_dag_table',
python_callable=get_dag_table,
dag=dag)
start_task >> postgres_task
Here are the steps that I followed:
1) I cloned the Puckel docker-airflow repo (https://github.com/puckel/docker-airflow).
2) I then ran the command $ docker-compose -f docker-compose-LocalExecutor.yml up -d
to start an Airflow webserver and the Postgres database.
3) Created a custom connection that looks like this:
4) When I trigger the DAG I get the following error:
[2019-10-07 14:51:11,034] {{taskinstance.py:1078}} INFO - Marking task as FAILED.
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table Traceback (most recent call last):
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table File "/usr/local/bin/airflow", line 32, in <module>
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table args.func(args)
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table return f(*args, **kwargs)
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 522, in run
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table _run(args, dag, ti)
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 440, in _run
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table pool=args.pool,
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table return func(*args, **kwargs)
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 922, in _run_raw_task
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table result = task_copy.execute(context=context)
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table return_value = self.execute_callable()
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table File "/usr/local/airflow/dags/tutorial-postgres.py", line 23, in get_dag_table
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table connection = hook.get_conn()
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table File "/usr/local/lib/python3.7/site-packages/airflow/hooks/postgres_hook.py", line 75, in get_conn
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table self.conn = psycopg2.connect(**conn_args)
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table File "/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py", line 130, in connect
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table psycopg2.OperationalError: could not connect to server: Connection refused
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table Is the server running on host "localhost" (127.0.0.1) and accepting
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table TCP/IP connections on port 5432?
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table could not connect to server: Cannot assign requested address
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table Is the server running on host "localhost" (::1) and accepting
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table TCP/IP connections on port 5432?
[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table
I have tried following every suggestion that I have found online and none have resolved this situation. I am confused because I can connect to the database using pycharm:
As well, when I run the command $ docker container ls
I get the following results showing that the Postgres container is open on port 5432:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
xxxxxxxxxxxx puckel/docker-airflow:1.10.4 "/entrypoint.sh webs…" 2 hours ago Up 2 hours (healthy) 5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp docker-airflow_webserver_1
xxxxxxxxxxxx postgres:9.6 "docker-entrypoint.s…" 2 days ago Up 2 hours 0.0.0.0:5432->5432/tcp docker-airflow_postgres_1
Upvotes: 2
Views: 5658
Reputation: 11
If host.docker.internal
still not work, try another way:
change YAML file, setting postgres service port:
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
services:
postgres:
image: postgres:13
ports:
- "5432:5432"
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
so that you could use host: localhost
port:5432 to connect your postgreSQL
Good luck:)
Upvotes: 0
Reputation: 118
Try changing the Host
field in the connection UI page, to host.docker.internal
or postgres
instead of localhost
.
Upvotes: 10