KristiLuna
KristiLuna

Reputation: 1903

using airflow - can't compare offset-naive and offset-aware datetimes

I'm seeing the following error when trying to run task t1 (also previously we were using BigQueryOperator but now trying to get BigQueryInsertJobOperator working, which is when this began happening:

File "", line 1, in top-level template code File "/usr/local/airflow/repo/dags/article_traffic/sql/bigquery_extract.sql", line 2, in top-level template code {% for vertical in get_verticals(execution_date) %} File "/usr/local/lib/python3.8/site-packages/jinja2/runtime.py", line 545, in next rv = next(self._iterator) File "/usr/local/airflow/repo/dags/article_traffic/utils.py", line 28, in isactive return fdt <= execdt <= tdt TypeError: can't compare offset-naive and offset-aware datetimes

below is our code, with task t1:

def extract_from_bigquery (parent_dag_name, child_dag_name, start_date, schedule_interval):
    dag = DAG(
        f'{parent_dag_name}.{child_dag_name}',
        schedule_interval=schedule_interval,
        start_date=start_date,
        user_defined_macros={
            'get_verticals': util.get_activeverticals,
        },
    )


    t1 = BigQueryInsertJobOperator(
        dag=dag,
        task_id='bq_query',
        gcp_conn_id='google_cloud_default',
        params={'data': util.querycontext},
        configuration={
            "query": {"query": "{% include 'sql/bigquery_extract.sql' %}"}
             }
    )

the get_activeverticals is the below method which is throwing the error:

def get_activeverticals(self, execdt):

    def isactive(v):
        fdt = datetime(*[int(dpart) for dpart in v.get('from', '1980-01-01').split('-')])
        tdt = datetime(*[int(dpart) for dpart in v.get('to', '3000-01-01').split('-')])

        return fdt <= execdt <= tdt

    active_verticals = filter(isactive, self.querycontext['verticals'])

    return active_verticals

Upvotes: 1

Views: 1134

Answers (1)

SergiyKolesnikov
SergiyKolesnikov

Reputation: 7815

Python's standard datetime() produces by default naive datetime objects. Airflow uses aware datetime objects.

I suspect that datetime() in your get_activeverticals() is imported from the Python's standard datetime. So, in fdt <= execdt <= tdt you compare naive (fdt, tdt) and aware (execdt) datetime objects. It is not allowed, so you get that type error.

A quick fix would be to use pendulum's datetime() instead, which will produce an aware datetime object:

from pendulum import datetime

Upvotes: 1

Related Questions