RGregg
RGregg

Reputation: 135

Airflow Failed: ParseException line 2:0 cannot recognize input near

I'm trying to run a test task on Airflow but I keep getting the following error:

FAILED: ParseException 2:0 cannot recognize input near 'create_import_table_fct_latest_values' '.' 'hql'

Here is my Airflow Dag file:

import airflow
from datetime import datetime, timedelta
from airflow.operators.hive_operator import HiveOperator
from airflow.models import DAG

args = {
    'owner': 'raul',
    'start_date': datetime(2018, 11, 12),
    'provide_context': True,
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False
}

dag = DAG('opus_data', 
    default_args=args,
    max_active_runs=6,
    schedule_interval="@daily"
)

import_lv_data = HiveOperator(
    task_id='fct_latest_values',
    hive_cli_conn_id='metastore_default',
    hql='create_import_table_fct_latest_values.hql ',
    hiveconf_jinja_translate=True,
    dag=dag
    )

deps = {}

# Explicity define the dependencies in the DAG
for downstream, upstream_list in deps.iteritems():
    for upstream in upstream_list:
        dag.set_dependency(upstream, downstream)

Here is the content of my HQL file, in case this may be the issue and I can't figure:

*I'm testing the connection to understand if the table is created or not, then I'll try to LOAD DATA, hence the LOAD DATA is commented out.
CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data (
    id_product          STRING,
    id_model            STRING,
    id_attribute        STRING,
    attribute_value     STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED ',';

#LOAD DATA LOCAL INPATH
#'/media/windows_share/schemas/opus/fct_latest_values_20181106.csv'
#OVERWRITE INTO TABLE opus_data.fct_latest_values_new_data;

Upvotes: 1

Views: 1389

Answers (2)

RGregg
RGregg

Reputation: 135

I managed to find the answer for my issue.

It was related to the path my HiveOperator was calling the file from. As no Variable had been defined to tell Airflow where to look for, I was getting the error I mentioned in my post.

Once I have defined it using the webserver interface (See picture), my dag started to work propertly. enter image description here

I made a change to my DAG code regarding the file location for organization only and this is how my HiveOperator looks like now:

import_lv_data = HiveOperator(
    task_id='fct_latest_values',
    hive_cli_conn_id='metastore_default',
    hql='hql/create_import_table_fct_latest_values2.hql',
    hiveconf_jinja_translate=True,
    dag=dag
    )

Thanks to (@panov.st) who helped me in person to identify my issue.

Upvotes: 0

leftjoin
leftjoin

Reputation: 38335

In the HQL file it should be FIELDS TERMINATED BY ',':

CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data (
    id_product          STRING,
    id_model            STRING,
    id_attribute        STRING,
    attribute_value     STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

And comments should start with -- in HQL file, not #

Also this seems incorrect and causing Exception hql='create_import_table_fct_latest_values.hql '

Have a look at this example:

 #Create full path for the file
    hql_file_path = os.path.join(os.path.dirname(__file__), source['hql'])
    print hql_file_path
    run_hive_query = HiveOperator(
        task_id='run_hive_query',
        dag = dag,
        hql = """
        {{ local_hive_settings }}
        """ + "\n " + open(hql_file_path, 'r').read()
)

See here for more details.

Or put all HQL into hql parameter:

hql='CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data ...'

Upvotes: 1

Related Questions