Reputation: 41
So in my case I've previously ran Airflow locally directly on my machine and now I'm trying to run it through containers using docker while also keeping the history of my previous dags. However I've been having some issues.
A slight bit of background ... when I first used docker-compose to bring up my containers airflow was sending an error message saying that the column dag_has_import_errors
doesn't exist. So I just went ahead and created it and everything seemed to work fine.
Now however my dags are all broken and when I modify one without fixing the issue I can see see the updated line of code in the brief error information that shows up at the top of the webserver.
However when I resolve the issue the code doesn't change and DAG remains broken.
I'll provide
this image of the error
this is the image of the code\
also the following is my docker-compose file (I commented out airflow db init but may I should have kept it with the db upgrade parameter as true? My compose file is based on this template\
version: '3.1'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
# postgresql+psycopg2://postgres:airflow@localhost:5434/airflowdb
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:airflow@postgres:5434/airflowdb
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://postgres:airflow@postgres:5434/airflowdb
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflowdb
PGPORT: 5434
volumes:
- pipeline-scripts_airflow-docker-db:/var/lib/postgresql/data
# - postgres-db-volume:/var/lib/postgresql/data
ports:
- 5434:5434
healthcheck:
test: ["CMD", "pg_isready", "-U", "postgres"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
# airflow-init:
# condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
# airflow-init:
# condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
# airflow-init:
# condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
# airflow-init:
# condition: service_completed_successfully
#below here
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
# airflow-init:
# condition: service_completed_successfully
# volumes:
# postgres-db-volume:
volumes:
pipeline-scripts_airflow-docker-db:
external: true
also the logs from my container are interesting they are the following\
apache-airflow-airflow-scheduler-1 | Process DagFileProcessor4728-Process:
apache-airflow-airflow-scheduler-1 | Traceback (most recent call last):
apache-airflow-airflow-scheduler-1 | File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
apache-airflow-airflow-scheduler-1 | self.run()
apache-airflow-airflow-scheduler-1 | File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
apache-airflow-airflow-scheduler-1 | self._target(*self._args, **self._kwargs)
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py", line 168, in _run_file_processor
apache-airflow-airflow-scheduler-1 | callback_requests=callback_requests,
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
apache-airflow-airflow-scheduler-1 | return func(*args, session=session, **kwargs)
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py", line 663, in process_file
apache-airflow-airflow-scheduler-1 | dagbag.sync_to_db()
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
apache-airflow-airflow-scheduler-1 | return func(*args, session=session, **kwargs)
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 608, in sync_to_db
apache-airflow-airflow-scheduler-1 | for attempt in run_with_db_retries(logger=self.log):
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line 382, in __iter__
apache-airflow-airflow-scheduler-1 | do = self.iter(retry_state=retry_state)
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line 349, in iter
apache-airflow-airflow-scheduler-1 | return fut.result()
apache-airflow-airflow-scheduler-1 | File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 428, in result
apache-airflow-airflow-scheduler-1 | return self.__get_result()
apache-airflow-airflow-scheduler-1 | File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
apache-airflow-airflow-scheduler-1 | raise self._exception
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 622, in sync_to_db
apache-airflow-airflow-scheduler-1 | DAG.bulk_write_to_db(self.dags.values(), session=session)
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper
apache-airflow-airflow-scheduler-1 | return func(*args, **kwargs)
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 2433, in bulk_write_to_db
apache-airflow-airflow-scheduler-1 | most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter}
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 2433, in <dictcomp>
apache-airflow-airflow-scheduler-1 | most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter}
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 100, in instances
apache-airflow-airflow-scheduler-1 | cursor.close()
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
apache-airflow-airflow-scheduler-1 | with_traceback=exc_tb,
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
apache-airflow-airflow-scheduler-1 | raise exception
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 80, in instances
apache-airflow-airflow-scheduler-1 | rows = [proc(row) for row in fetch]
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 80, in <listcomp>
apache-airflow-airflow-scheduler-1 | rows = [proc(row) for row in fetch]
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 588, in _instance
apache-airflow-airflow-scheduler-1 | populators,
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 725, in _populate_full
apache-airflow-airflow-scheduler-1 | dict_[key] = getter(row)
apache-airflow-airflow-scheduler-1 | File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1723, in process
apache-airflow-airflow-scheduler-1 | return loads(value)
apache-airflow-airflow-scheduler-1 | ValueError: unsupported pickle protocol: 5
If any another information is needed I can gladly provide it.
EDIT:
Tiny update so I wnt ahead and ran\ docker exec -it apache-airflow-airflow-webserver-1 bash
and then did airflow db upgrade
because after all it's just alembic and shouldn't delete my data.
So after doing that it added the missing column itself like so.
So now when I look in the postgres database I'm using it shows that the dag.has_import_errors
if false.
However in the table import_error
I still have the same issue with the dags not updating.
Upvotes: 1
Views: 7910
Reputation: 41
LETS GOOOOOOOOOO!
PAN COMIDO!
DU GATEAU!
Finally got it to work :). So the main issue was the fact that I didn't have all the required packages. So I tried doing just pip install configparser
in the container and this actually helped for one of the DAGs I had to run. However this didn't seem sustainable nor practical so I decided to just go ahead with the Dockerfile method in effect extending the image. I believe this was the way they called it.
So here's my Dockerfile \
FROM apache/airflow:2.2.3-python3.8
COPY requirements.txt ./
RUN pip install -r requirements.txt
Now two important things about this Dockerfile one is that of course I install the dependencies that I may need but some of my dependencies clashed with those of airflows and I just decided to remove those from my requirements.txt
file.
The second thing is having added python3.8
this actually knocks out the error ValueError: unsupported pickle protocol: 5
which will inhibit you from seeing the histories of your dags.
The other issues I had was finding ways to place a file in my container such as my key file for an ssh operator but that's another story :D.
Then if course in the docker-compose.yaml
file you'll have to edit it in the following way \
# image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3-python3.8}
build: .
environment:
these solved the bulk of the issues.
The only thing that kind of bothers me is that apache-airflow-airflow-webserver-1 shows up as red in the docker logs. So I'm not sure if that's normal but apart from this everything's healthy when I run docker ps
.
Upvotes: 3