Reputation: 3729
I have a airflow script that tries to insert data from one table to another, I am using a Amazon Redshift DB. The given below script when triggered does not execute. Task_id status remains as 'no status' in the Graph view and no other error is shown.
## Third party Library Imports
import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io
# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 1, 23, 12),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")
#######################
## Login to DB
def db_login():
global db_conn
try:
db_conn = psycopg2.connect(
" dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439' sslmode = 'require' ")
except:
print("I am unable to connect to the database.")
print('Connection Task Complete: Connected to DB')
return (db_conn)
#######################
def insert_data():
cur = db_conn.cursor()
cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2 ;""")
db_conn.commit()
print('ETL Task Complete')
def job_run():
db_login()
insert_data()
##########################################
t1 = PythonOperator(
task_id='DBConnect',
python_callable=job_run,
bash_command='python3 ~/airflow/dags/sample.py',
dag=dag)
t1
Could anyone assist to find where the problem could be. Thanks
Updated Code (05/28)
## Third party Library Imports
import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
#from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io
# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 23, 12),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")
#######################
## Login to DB
def data_warehouse_login():
global dwh_connection
try:
dwh_connection = psycopg2.connect(
" dbname = 'name' user = 'user' password = 'pass' host = 'host' port = 'port' sslmode = 'require' ")
except:
print("Connection Failed.")
print('Connected successfully')
return (dwh_connection)
def insert_data():
cur = dwh_connection.cursor()
cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2;""")
dwh_connection.commit()
print('Task Complete: Insert success')
def job_run():
data_warehouse_login()
insert_data()
##########################################
t1 = PythonOperator(
task_id='DWH_Connect',
python_callable=job_run(),
# bash_command='python3 ~/airflow/dags/sample.py',
dag=dag)
t1
Log message when running the script
[2018-05-28 11:36:45,300] {jobs.py:343} DagFileProcessor26 INFO - Started process (PID=26489) to work on /Users/user/airflow/dags/sample.py
[2018-05-28 11:36:45,306] {jobs.py:534} DagFileProcessor26 ERROR - Cannot use more than 1 thread when using sqlite. Setting max_threads to 1
[2018-05-28 11:36:45,310] {jobs.py:1521} DagFileProcessor26 INFO - Processing file /Users/user/airflow/dags/sample.py for tasks to queue
[2018-05-28 11:36:45,310] {models.py:167} DagFileProcessor26 INFO - Filling up the DagBag from /Users/user/airflow/dags/sample.py
/Users/user/anaconda3/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
""")
Task Complete: Insert success
[2018-05-28 11:36:50,964] {jobs.py:1535} DagFileProcessor26 INFO - DAG(s) dict_keys(['latest_only', 'example_python_operator', 'test_utils', 'example_bash_operator', 'example_short_circuit_operator', 'example_branch_operator', 'tutorial', 'example_passing_params_via_test_command', 'latest_only_with_trigger', 'example_xcom', 'example_http_operator', 'example_skip_dag', 'example_trigger_target_dag', 'example_branch_dop_operator_v3', 'example_subdag_operator', 'example_subdag_operator.section-1', 'example_subdag_operator.section-2', 'example_trigger_controller_dag', 'insert_data2']) retrieved from /Users/user/airflow/dags/sample.py
[2018-05-28 11:36:51,159] {jobs.py:1169} DagFileProcessor26 INFO - Processing example_subdag_operator
[2018-05-28 11:36:51,167] {jobs.py:566} DagFileProcessor26 INFO - Skipping SLA check for <DAG: example_subdag_operator> because no tasks in DAG have SLAs
[2018-05-28 11:36:51,170] {jobs.py:1169} DagFileProcessor26 INFO - Processing sample_dag
[2018-05-28 11:36:51,174] {jobs.py:354} DagFileProcessor26 ERROR - Got an exception! Propagating...
Traceback (most recent call last):
File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 346, in helper
pickle_dags)
File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1581, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1171, in _process_dags
dag_run = self.create_dag_run(dag)
File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 776, in create_dag_run
if next_start <= now:
TypeError: '<=' not supported between instances of 'NoneType' and 'datetime.datetime'
Log from the Graph View
* Log file isn't local. * Fetching here: http://:8793/log/sample_dag/DWH_Connect/2018-05-28T12:23:57.595234 *** Failed to fetch log file from worker.
* Reading remote logs... * Unsupported remote log location.
Upvotes: 1
Views: 3024
Reputation: 8239
To the answer kaxil provided I would like to extend that you should be using an IDE to develop for Airflow. PyCharm works fine for me.
That being said, please make sure to look up the available fields in the docs next time. For PythonOperator, see the docs here:
https://airflow.apache.org/code.html#airflow.operators.PythonOperator
Signature looks like:
class airflow.operators.PythonOperator(python_callable, op_args=None, op_kwargs=None, provide_context=False, templates_dict=None, templates_exts=None, *args, **kwargs)
and for BashOperator, see the docs here:
https://airflow.apache.org/code.html#airflow.operators.BashOperator
Signature is:
class airflow.operators.BashOperator(bash_command, xcom_push=False, env=None, output_encoding='utf-8', *args, **kwargs)
Highlights are from me to show the parameters you have been using.
Make sure to dig through the documentation a bit before using an Operator is my recommendation.
EDIT
After seeing the code update there is one thing left:
Make sure when defining python_callable
in a task that you do so without brackets, otherwise the code will be called (which is very unintuitive if you don't know about it). So your code should look like this:
t1 = PythonOperator(
task_id='DWH_Connect',
python_callable=job_run,
dag=dag)
Upvotes: 1
Reputation: 18874
Instead of having a PythonOperator
you need to have a BashOperator
and a PythonOperator
.
You are getting the error because PythonOperator
doesn't have a bash_command
argument
t1 = PythonOperator(
task_id='DBConnect',
python_callable=db_login,
dag=dag
)
t2 = BashOperator(
task_id='Run Python File',
bash_command='python3 ~/airflow/dags/sample.py',
dag=dag
)
t1 >> t2
Upvotes: 1