Reputation: 337
I am running Airflow v1.9.0 with Celery Executor. I have configured different workers with different queue names like DEV, QA, UAT, PROD. I have written a custom sensor which polls a source db connection and a target db connection and run different queries and do some checks before triggering downstream-tasks. This has been running fine for multiple workers. In one of the workers, this sensor is giving an AttributeError Issue:
$ airflow test PDI_Incr_20190407_v1 checkCCWatermarkDt 2019-04-09
[2019-04-09 10:02:57,769] {configuration.py:206} WARNING - section/key [celery/celery_ssl_active] not found in config
[2019-04-09 10:02:57,770] {default_celery.py:41} WARNING - Celery Executor will run without SSL
[2019-04-09 10:02:57,771] {__init__.py:45} INFO - Using executor CeleryExecutor
[2019-04-09 10:02:57,817] {models.py:189} INFO - Filling up the DagBag from /home/airflow/airflow/dags
/usr/local/lib/python2.7/site-packages/airflow/models.py:2160: PendingDeprecationWarning: Invalid arguments were passed to ExternalTaskSensor. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'check_existence': True}
category=PendingDeprecationWarning
[2019-04-09 10:02:57,989] {base_hook.py:80} INFO - Using connection to: 172.16.20.11:1521/GWPROD
[2019-04-09 10:02:57,991] {base_hook.py:80} INFO - Using connection to: dmuat.cwmcwghvymd3.us-east-1.rds.amazonaws.com:1521/DMUAT
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 528, in test
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1584, in run
session=session)
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/operators/sensors.py", line 78, in execute
while not self.poke(context):
File "/home/airflow/airflow/plugins/PDIPlugin.py", line 29, in poke
wm_dt_src = hook_src.get_records(self.sql)
AttributeError: 'NoneType' object has no attribute 'get_records'
Although when I run the same test command from Scheduler CLI, it is running fine. The above issue looks like a database connection issue.
For debugging, I checked the DB Connections from Airflow UI: Data Profiling -> Ad Hoc Query Query: Select 1 from dual; -- This worked fine
I also did telnet from the worker node to the DB Host and port and that also went fine.
Custom Sensor Code:
from airflow.plugins_manager import AirflowPlugin
from airflow.hooks.base_hook import BaseHook
from airflow.operators.sensors import SqlSensor
class SensorWatermarkDt(SqlSensor):
def __init__(self, conn_id, sql, conn_id_tgt, sql_tgt, *args, **kwargs):
self.sql = sql
self.conn_id = conn_id
self.sql_tgt = sql_tgt
self.conn_id_tgt = conn_id_tgt
super(SqlSensor, self).__init__(*args, **kwargs)
def poke(self, context):
hook_src = BaseHook.get_connection(self.conn_id).get_hook()
hook_tgt = BaseHook.get_connection(self.conn_id_tgt).get_hook()
self.log.info('Poking: %s', self.sql)
self.log.info('Poking: %s', self.sql_tgt)
wm_dt_src = hook_src.get_records(self.sql)
wm_dt_tgt = hook_tgt.get_records(self.sql_tgt)
if wm_dt_src <= wm_dt_tgt:
return False
else:
return True
class PDIPlugin(AirflowPlugin):
name = "PDIPlugin"
operators = [SensorWatermarkDt]
Airflow DAG Snippet:
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from datetime import timedelta,datetime
from airflow.operators import SensorWatermarkDt
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'SenseTeam',
#'depends_on_past': True,
'depends_on_past' : False,
'start_date': datetime(2019, 4, 7, 17, 00),
'email': [],
'email_on_failure': False,
'email_on_retry': False,
'queue': 'PENTAHO_UAT'
}
dag = DAG(dag_id='PDI_Incr_20190407_v1',
default_args=default_args,
max_active_runs=1,
concurrency=1,
catchup=False,
schedule_interval=timedelta(hours=24),
dagrun_timeout=timedelta(minutes=23*60))
checkCCWatermarkDt = \
SensorWatermarkDt(task_id='checkCCWatermarkDt',
conn_id='CCUSER_SOURCE_GWPROD_RPT',
sql="SELECT MAX(CC_WM.CREATETIME) as CURRENT_WATERMARK_DATE FROM CCUSER.CCX_CAPTUREREASON_ETL CC_WM INNER JOIN CCUSER.CCTL_CAPTUREREASON_ETL CC_WMLKP ON CC_WM.CAPTUREREASON_ETL = CC_WMLKP.ID AND UPPER(CC_WMLKP.DESCRIPTION)= 'WATERMARK'",
conn_id_tgt = 'RDS_DMUAT_DMCONFIG',
sql_tgt = "SELECT MAX(CURRENT_WATERMARK_DATE) FROM DMCONFIG.PRESTG_DM_WMD_WATERMARKDATE WHERE SCHEMA_NAME = 'CCUSER'",
poke_interval=60,
dag=dag)
...
I have restarted web server, scheduler and airflow worker after adding this plugin in this worker node.
What am I missing here?
Upvotes: 0
Views: 2789
Reputation: 274
I have met this problem as well when I tried to use airflow's hook to connect to Teradata database, so I have read the code in airflow, we can see the get_hook() function in the path: /<your python path(may like: /usr/lib64/python2.7/)>/site-packages/airflow/models/connection.py:
def get_hook(self):
try:
if self.conn_type == 'mysql':
from airflow.hooks.mysql_hook import MySqlHook
return MySqlHook(mysql_conn_id=self.conn_id)
elif self.conn_type == 'google_cloud_platform':
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
return BigQueryHook(bigquery_conn_id=self.conn_id)
elif self.conn_type == 'postgres':
from airflow.hooks.postgres_hook import PostgresHook
return PostgresHook(postgres_conn_id=self.conn_id)
elif self.conn_type == 'hive_cli':
from airflow.hooks.hive_hooks import HiveCliHook
return HiveCliHook(hive_cli_conn_id=self.conn_id)
elif self.conn_type == 'presto':
from airflow.hooks.presto_hook import PrestoHook
return PrestoHook(presto_conn_id=self.conn_id)
elif self.conn_type == 'hiveserver2':
from airflow.hooks.hive_hooks import HiveServer2Hook
return HiveServer2Hook(hiveserver2_conn_id=self.conn_id)
elif self.conn_type == 'sqlite':
from airflow.hooks.sqlite_hook import SqliteHook
return SqliteHook(sqlite_conn_id=self.conn_id)
elif self.conn_type == 'jdbc':
from airflow.hooks.jdbc_hook import JdbcHook
return JdbcHook(jdbc_conn_id=self.conn_id)
elif self.conn_type == 'mssql':
from airflow.hooks.mssql_hook import MsSqlHook
return MsSqlHook(mssql_conn_id=self.conn_id)
elif self.conn_type == 'oracle':
from airflow.hooks.oracle_hook import OracleHook
return OracleHook(oracle_conn_id=self.conn_id)
elif self.conn_type == 'vertica':
from airflow.contrib.hooks.vertica_hook import VerticaHook
return VerticaHook(vertica_conn_id=self.conn_id)
elif self.conn_type == 'cloudant':
from airflow.contrib.hooks.cloudant_hook import CloudantHook
return CloudantHook(cloudant_conn_id=self.conn_id)
elif self.conn_type == 'jira':
from airflow.contrib.hooks.jira_hook import JiraHook
return JiraHook(jira_conn_id=self.conn_id)
elif self.conn_type == 'redis':
from airflow.contrib.hooks.redis_hook import RedisHook
return RedisHook(redis_conn_id=self.conn_id)
elif self.conn_type == 'wasb':
from airflow.contrib.hooks.wasb_hook import WasbHook
return WasbHook(wasb_conn_id=self.conn_id)
elif self.conn_type == 'docker':
from airflow.hooks.docker_hook import DockerHook
return DockerHook(docker_conn_id=self.conn_id)
elif self.conn_type == 'azure_data_lake':
from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook
return AzureDataLakeHook(azure_data_lake_conn_id=self.conn_id)
elif self.conn_type == 'azure_cosmos':
from airflow.contrib.hooks.azure_cosmos_hook import AzureCosmosDBHook
return AzureCosmosDBHook(azure_cosmos_conn_id=self.conn_id)
elif self.conn_type == 'cassandra':
from airflow.contrib.hooks.cassandra_hook import CassandraHook
return CassandraHook(cassandra_conn_id=self.conn_id)
elif self.conn_type == 'mongo':
from airflow.contrib.hooks.mongo_hook import MongoHook
return MongoHook(conn_id=self.conn_id)
elif self.conn_type == 'gcpcloudsql':
from airflow.contrib.hooks.gcp_sql_hook import CloudSqlDatabaseHook
return CloudSqlDatabaseHook(gcp_cloudsql_conn_id=self.conn_id)
except Exception:
pass
It means if you don't have this kind of type connection then the get_hook will return a 'NoneType' Type. So that is the reason why it happened.
How to Resolve: Add a your own hook is best way in airflow , I had a sample for Teradata here:
# cat teradata_hook.py
from builtins import str
import jaydebeapi
from airflow.hooks.dbapi_hook import DbApiHook
class TeradataJdbcHook(DbApiHook):
conn_name_attr = 'teradata_conn_id'
default_conn_name = 'teradata_default'
supports_autocommit = True
def get_conn(self):
conn = self.get_connection(getattr(self, self.conn_name_attr))
host = conn.host
url = 'jdbc:teradata://' + host + '/TMODE=TERA'
login = conn.login
psw = conn.password
jdbc_driver_loc = '/opt/spark-2.3.1-bin-without-hadoop/jars/terajdbc4-16.20.00.06.jar,/opt/spark-2.3.1-bin-without-hadoop/jars/tdgssconfig-16.20.00.06.jar'
jdbc_driver_name = "com.teradata.jdbc.TeraDriver"
conn = jaydebeapi.connect(jclassname=jdbc_driver_name,
url=url,
driver_args=[str(login), str(psw)],
jars=jdbc_driver_loc.split(","))
return conn
def set_autocommit(self, conn, autocommit):
"""
Enable or disable autocommit for the given connection.
:param conn: The connection
:return:
"""
conn.jconn.setAutoCommit(autocommit)
Then you can call this hook to connect teradata database (or other database which has jdbc driver ):
[root@myhost transfer]# cat h.py
import util
from airflow.hooks.base_hook import BaseHook
from teradata_hook import TeradataJdbcHook
sql = "SELECT COUNT(*) FROM TERADATA_TABLE where month_key='202009'"
conn_id='teradata_account@dbname' # this is my environment's id format
hook = TeradataJdbcHook(conn_id)
records = hook.get_records(sql)
print(records)
if str(records[0][0]) in ('0', '',):
print("No Records")
else:
print("Has Records")
It return's result: [(7734133,)]
Upvotes: 1