Reputation: 1903
I'm seeing the following error when trying to run task t1 (also previously we were using BigQueryOperator but now trying to get BigQueryInsertJobOperator working, which is when this began happening:
File "", line 1, in top-level template code File "/usr/local/airflow/repo/dags/article_traffic/sql/bigquery_extract.sql", line 2, in top-level template code {% for vertical in get_verticals(execution_date) %} File "/usr/local/lib/python3.8/site-packages/jinja2/runtime.py", line 545, in next rv = next(self._iterator) File "/usr/local/airflow/repo/dags/article_traffic/utils.py", line 28, in isactive return fdt <= execdt <= tdt TypeError: can't compare offset-naive and offset-aware datetimes
below is our code, with task t1:
def extract_from_bigquery (parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
f'{parent_dag_name}.{child_dag_name}',
schedule_interval=schedule_interval,
start_date=start_date,
user_defined_macros={
'get_verticals': util.get_activeverticals,
},
)
t1 = BigQueryInsertJobOperator(
dag=dag,
task_id='bq_query',
gcp_conn_id='google_cloud_default',
params={'data': util.querycontext},
configuration={
"query": {"query": "{% include 'sql/bigquery_extract.sql' %}"}
}
)
the get_activeverticals is the below method which is throwing the error:
def get_activeverticals(self, execdt):
def isactive(v):
fdt = datetime(*[int(dpart) for dpart in v.get('from', '1980-01-01').split('-')])
tdt = datetime(*[int(dpart) for dpart in v.get('to', '3000-01-01').split('-')])
return fdt <= execdt <= tdt
active_verticals = filter(isactive, self.querycontext['verticals'])
return active_verticals
Upvotes: 1
Views: 1134
Reputation: 7815
Python's standard datetime()
produces by default naive datetime objects. Airflow uses aware datetime objects.
I suspect that datetime()
in your get_activeverticals()
is imported from the Python's standard datetime
. So, in fdt <= execdt <= tdt
you compare naive (fdt
, tdt
) and aware (execdt
) datetime objects. It is not allowed, so you get that type error.
A quick fix would be to use pendulum's datetime()
instead, which will produce an aware datetime object:
from pendulum import datetime
Upvotes: 1