Reputation: 113
I am running a Sqoop import command which imports a table from MySQL db and loads it to HDFS using Sqoop. I have created a below DAG which performs this above activity.
from airflow.models import DAG
from airflow.contrib.operators.sqoop_operator import SqoopOperator
from airflow.utils.dates import days_ago
Dag_Sqoop_Import = DAG(dag_id="SqoopImport",
schedule_interval="* * * * *",
start_date=days_ago(2))
sqoop_mysql_import = SqoopOperator(conn_id="sqoop_local",
table="shipmethod",
cmd_type="import",
target_dir="/airflow_sqoopImport",
num_mappers=1,
task_id="SQOOP_Import",
dag=Dag_Sqoop_Import)
sqoop_mysql_import
I have also created a SqoopImport connection in Airflow as below.
But when is Trigger the job it should take the below command as I assume
sqoop import --connect jdbc:mysql://192.168.0.15:3306/adventureworks?characterEncoding=latin1 --driver com.mysql.jdbc.Driver --username xxxx --password xxxxxx --autoreset-to-one-mapper --table workorder --target-dir /user/adminn/workorder
But when I check in logs its actually taking below command
Executing command: sqoop import --username xxxx --password MASKED --num-mappers 1 --connect jdbc:mysql://192.168.0.15:3306/adventureworks?characterEncoding=latin1 --target-dir /airflow_sqoopImport --as-textfile --table shipmethod
And the DAG fails giving below error. also I know the cause of this error, I need to add the parameter driver com.mysql.jdbc.Driver
which can solve the below error. am struggling to add, can you please let me know where am going wrong.
ERROR manager.SqlManager: Error reading from database: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@5906ebcb is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.'
Replies Appreciated, thanks.
Upvotes: 0
Views: 145
Reputation: 5110
You should provide the driver class as an argument for the operator and not the connection
sqoop_mysql_import = SqoopOperator(conn_id="sqoop_local",
table="shipmethod",
cmd_type="import",
target_dir="/airflow_sqoopImport",
driver="com.mysql.jdbc.Driver",
num_mappers=1,
task_id="SQOOP_Import",
dag=Dag_Sqoop_Import)
Upvotes: 1