Raindata
Raindata

Reputation: 41

When running Apache Airflow in Docker how can I fix the issue where my DAGs don't become unbroken even after fixing them?


So in my case I've previously ran Airflow locally directly on my machine and now I'm trying to run it through containers using docker while also keeping the history of my previous dags. However I've been having some issues.
A slight bit of background ... when I first used docker-compose to bring up my containers airflow was sending an error message saying that the column dag_has_import_errors doesn't exist. So I just went ahead and created it and everything seemed to work fine.
Now however my dags are all broken and when I modify one without fixing the issue I can see see the updated line of code in the brief error information that shows up at the top of the webserver.
However when I resolve the issue the code doesn't change and DAG remains broken. I'll provide
this image of the error
this is the image of the code\

also the following is my docker-compose file (I commented out airflow db init but may I should have kept it with the db upgrade parameter as true? My compose file is based on this template\

version: '3.1'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    # postgresql+psycopg2://postgres:airflow@localhost:5434/airflowdb
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:airflow@postgres:5434/airflowdb
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://postgres:airflow@postgres:5434/airflowdb
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflowdb
      PGPORT: 5434

    volumes:
      - pipeline-scripts_airflow-docker-db:/var/lib/postgresql/data
      # - postgres-db-volume:/var/lib/postgresql/data
    ports:
      - 5434:5434
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "postgres"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

#below here
  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      # airflow-init:
      #   condition: service_completed_successfully

# volumes:
#   postgres-db-volume:
volumes: 
    pipeline-scripts_airflow-docker-db:
        external: true

also the logs from my container are interesting they are the following\

apache-airflow-airflow-scheduler-1  | Process DagFileProcessor4728-Process:
apache-airflow-airflow-scheduler-1  | Traceback (most recent call last):
apache-airflow-airflow-scheduler-1  |   File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
apache-airflow-airflow-scheduler-1  |     self.run()
apache-airflow-airflow-scheduler-1  |   File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
apache-airflow-airflow-scheduler-1  |     self._target(*self._args, **self._kwargs)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py", line 168, in _run_file_processor
apache-airflow-airflow-scheduler-1  |     callback_requests=callback_requests,
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
apache-airflow-airflow-scheduler-1  |     return func(*args, session=session, **kwargs)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py", line 663, in process_file
apache-airflow-airflow-scheduler-1  |     dagbag.sync_to_db()
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
apache-airflow-airflow-scheduler-1  |     return func(*args, session=session, **kwargs)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 608, in sync_to_db
apache-airflow-airflow-scheduler-1  |     for attempt in run_with_db_retries(logger=self.log):
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line 382, in __iter__
apache-airflow-airflow-scheduler-1  |     do = self.iter(retry_state=retry_state)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line 349, in iter
apache-airflow-airflow-scheduler-1  |     return fut.result()
apache-airflow-airflow-scheduler-1  |   File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 428, in result
apache-airflow-airflow-scheduler-1  |     return self.__get_result()
apache-airflow-airflow-scheduler-1  |   File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
apache-airflow-airflow-scheduler-1  |     raise self._exception
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 622, in sync_to_db
apache-airflow-airflow-scheduler-1  |     DAG.bulk_write_to_db(self.dags.values(), session=session)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper
apache-airflow-airflow-scheduler-1  |     return func(*args, **kwargs)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 2433, in bulk_write_to_db
apache-airflow-airflow-scheduler-1  |     most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter}
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 2433, in <dictcomp>
apache-airflow-airflow-scheduler-1  |     most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter}
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 100, in instances
apache-airflow-airflow-scheduler-1  |     cursor.close()
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
apache-airflow-airflow-scheduler-1  |     with_traceback=exc_tb,
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
apache-airflow-airflow-scheduler-1  |     raise exception
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 80, in instances
apache-airflow-airflow-scheduler-1  |     rows = [proc(row) for row in fetch]
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 80, in <listcomp>
apache-airflow-airflow-scheduler-1  |     rows = [proc(row) for row in fetch]
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 588, in _instance
apache-airflow-airflow-scheduler-1  |     populators,
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 725, in _populate_full
apache-airflow-airflow-scheduler-1  |     dict_[key] = getter(row)
apache-airflow-airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1723, in process
apache-airflow-airflow-scheduler-1  |     return loads(value)
apache-airflow-airflow-scheduler-1  | ValueError: unsupported pickle protocol: 5

If any another information is needed I can gladly provide it.
EDIT: Tiny update so I wnt ahead and ran\ docker exec -it apache-airflow-airflow-webserver-1 bash and then did airflow db upgrade because after all it's just alembic and shouldn't delete my data.
So after doing that it added the missing column itself like so. So now when I look in the postgres database I'm using it shows that the dag.has_import_errors if false.
However in the table import_error I still have the same issue with the dags not updating.

Upvotes: 1

Views: 7910

Answers (1)

Raindata
Raindata

Reputation: 41

LETS GOOOOOOOOOO!
PAN COMIDO!
DU GATEAU!
Finally got it to work :). So the main issue was the fact that I didn't have all the required packages. So I tried doing just pip install configparser in the container and this actually helped for one of the DAGs I had to run. However this didn't seem sustainable nor practical so I decided to just go ahead with the Dockerfile method in effect extending the image. I believe this was the way they called it. So here's my Dockerfile \

FROM apache/airflow:2.2.3-python3.8

COPY requirements.txt ./

RUN pip install -r requirements.txt

Now two important things about this Dockerfile one is that of course I install the dependencies that I may need but some of my dependencies clashed with those of airflows and I just decided to remove those from my requirements.txt file.
The second thing is having added python3.8 this actually knocks out the error ValueError: unsupported pickle protocol: 5 which will inhibit you from seeing the histories of your dags.
The other issues I had was finding ways to place a file in my container such as my key file for an ssh operator but that's another story :D.
Then if course in the docker-compose.yaml file you'll have to edit it in the following way \

  # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3-python3.8}
  build: .
  environment:

these solved the bulk of the issues.
The only thing that kind of bothers me is that apache-airflow-airflow-webserver-1 shows up as red in the docker logs. So I'm not sure if that's normal but apart from this everything's healthy when I run docker ps.

Upvotes: 3

Related Questions