Reputation: 1246
First, I'm new to Airflow and Python. In the past I have installed airflow 2.3.3 (and some providers) on wsl2. My dag worked as expected without any error My dag:
import os
from datetime import datetime
import logging
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator,
BigQueryCreateEmptyTableOperator,
BigQueryDeleteDatasetOperator,
BigQueryDeleteTableOperator,
BigQueryGetDatasetTablesOperator,
BigQueryUpdateDatasetOperator,
BigQueryUpdateTableOperator,
BigQueryUpdateTableSchemaOperator,
BigQueryUpsertTableOperator,
BigQueryInsertJobOperator,
)
from airflow.utils.trigger_rule import TriggerRule
DAG_ID = "bigquery_dataset"
DATASET_NAME = f"dataset_{DAG_ID}"
with models.DAG(
DAG_ID,
schedule_interval="@once",
start_date=datetime(2022, 8, 16),
catchup=False,
tags=["example", "bigquery"],
) as dag:
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME, exists_ok=True)
create_dataset
Now I have to downgrade airflow from 2.3.3 to 2.2.5
Steps:
Now I recreate and trigger my dag but I got error:
scheduler | [2022-08-19 15:31:05,927] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bigquery_dataset', 'create_view', 'manual__2022-08-19T08:30:58.710367+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_bigquery.py']
scheduler | [2022-08-19 15:31:07,217] {dagbag.py:500} INFO - Filling up the DagBag from /home/bao/airflow/dags/test_bigquery.py
scheduler | Running <TaskInstance: bigquery_dataset.create_view manual__2022-08-19T08:30:58.710367+00:00 [queued]> on host DESKTOP-H8O5RAP.localdomain
scheduler | Traceback (most recent call last):
scheduler | File "/home/bao/.local/bin/airflow", line 8, in <module>
scheduler | sys.exit(main())
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/__main__.py", line 48, in main
scheduler | args.func(args)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
scheduler | return func(*args, **kwargs)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
scheduler | return f(*args, **kwargs)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 298, in task_run
scheduler | _run_task_by_selected_method(args, dag, ti)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
scheduler | _run_task_by_local_task_job(args, ti)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
scheduler | run_job.run()
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 246, in run
scheduler | self._execute()
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 78, in _execute
scheduler | self.task_runner = get_task_runner(self)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/task/task_runner/__init__.py", line 63, in get_task_runner
scheduler | task_runner = task_runner_class(local_task_job)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 35, in __init__
scheduler | super().__init__(local_task_job)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/task/task_runner/base_task_runner.py", line 48, in __init__
scheduler | super().__init__(local_task_job.task_instance)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py", line 40, in __init__
scheduler | self._set_context(context)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py", line 54, in _set_context
scheduler | set_context(self.log, context)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py", line 178, in set_context
scheduler | handler.set_context(value)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 60, in set_context
scheduler | local_loc = self._init_file(ti)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 283, in _init_file
scheduler | relative_path = self._render_filename(ti, ti.try_number)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 85, in _render_filename
scheduler | return render_template_to_string(self.filename_jinja_template, context)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/helpers.py", line 268, in render_template_to_string
scheduler | return render_template(template, context, native=False)
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/airflow/utils/helpers.py", line 263, in render_template
scheduler | return "".join(nodes)
scheduler | File "<template>", line 20, in root
scheduler | File "/home/bao/.local/lib/python3.8/site-packages/jinja2/runtime.py", line 903, in _fail_with_undefined_error
scheduler | raise self._undefined_exception(self._undefined_message)
scheduler | jinja2.exceptions.UndefinedError: 'airflow.models.taskinstance.TaskInstance object' has no attribute 'map_index'
scheduler | [2022-08-19 15:31:08,234] {sequential_executor.py:66} ERROR - Failed to execute task Command '['airflow', 'tasks', 'run', 'bigquery_dataset', 'create_view', 'manual__2022-08-19T08:30:58.710367+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_bigquery.py']' returned non-zero exit status 1..
The log said that jinja2 tried to render taskinstance but it failed since there is no attribute 'map_index', right? Is there version conflict or something wrong with my code (or my environmnent)? Thank you
Upvotes: 0
Views: 1787
Reputation: 278
Looks like your database schema is outdated. Or, actually, database is correct, but airflow is somewhy wrong.
Try to check airflow version and either migrate your database or reinstall airflow:
airflow version
airflow db update # If airflow version is correct
pip3 install airflow==2.2.5 # If not
Idk, can this command downgrade the DB, but if you were moving from 2.2.5 to 2.3.4 it would definitely help.
More about it on https://airflow.apache.org/docs/apache-airflow/stable/installation/upgrading.html, https://airflow.apache.org/docs/apache-airflow/stable/migrations-ref.html and https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#db
Upvotes: 1