Aashu
Aashu

Reputation: 122

Task exited with return code Negsignal.SIGABRT. Airflow task fails with SnowflakeOperator

Running this DAG in airflow gives error as Task exited with return code Negsignal.SIGABRT.

I am not sure what is wrong I have done


    from airflow import DAG
    from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
    from airflow.utils.dates import days_ago
    
    SNOWFLAKE_CONN_ID = 'snowflake_conn'
    # TODO: should be able to rely on connection's schema, but currently param required by S3ToSnowflakeTransfer
    # SNOWFLAKE_SCHEMA = 'schema_name'
    #SNOWFLAKE_STAGE = 'stage_name'
    SNOWFLAKE_WAREHOUSE = 'SF_TUTS_WH'
    SNOWFLAKE_DATABASE = 'KAFKA_DB'
    SNOWFLAKE_ROLE = 'sysadmin'
    SNOWFLAKE_SAMPLE_TABLE = 'sample_table'
    
    CREATE_TABLE_SQL_STRING = (
        f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE} (name VARCHAR(250), id INT);"
    )
    
    SQL_INSERT_STATEMENT = f"INSERT INTO {SNOWFLAKE_SAMPLE_TABLE} VALUES ('name', %(id)s)"
    SQL_LIST = [SQL_INSERT_STATEMENT % {"id": n} for n in range(0, 10)]
    
    default_args = {
        'owner': 'airflow',
    }
    
    dag = DAG(
        'example_snowflake',
        default_args=default_args,
        start_date=days_ago(2),
        tags=['example'],
    )
    
    snowflake_op_sql_str = SnowflakeOperator(
        task_id='snowflake_op_sql_str',
        dag=dag,
        snowflake_conn_id=SNOWFLAKE_CONN_ID,
        sql=CREATE_TABLE_SQL_STRING,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
      #  schema=SNOWFLAKE_SCHEMA,
        role=SNOWFLAKE_ROLE,
    )
    
    snowflake_op_with_params = SnowflakeOperator(
        task_id='snowflake_op_with_params',
        dag=dag,
        snowflake_conn_id=SNOWFLAKE_CONN_ID,
        sql=SQL_INSERT_STATEMENT,
        parameters={"id": 56},
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
     #   schema=SNOWFLAKE_SCHEMA,
        role=SNOWFLAKE_ROLE,
    )
    
    
    snowflake_op_sql_list = SnowflakeOperator(
        task_id='snowflake_op_sql_list', dag=dag, snowflake_conn_id=SNOWFLAKE_CONN_ID, sql=SQL_LIST
    )
    
    snowflake_op_sql_str >> [
        snowflake_op_with_params,
        snowflake_op_sql_list,]

Getting LOGS in airFlow as below ::

 Reading local file: /Users/aashayjain/airflow/logs/snowflake_test/snowflake_op_with_params/2021-02-02T13:51:18.229233+00:00/1.log
[2021-02-02 19:21:38,880] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: snowflake_test.snowflake_op_with_params 2021-02-02T13:51:18.229233+00:00 [queued]>
[2021-02-02 19:21:38,887] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: snowflake_test.snowflake_op_with_params 2021-02-02T13:51:18.229233+00:00 [queued]>
[2021-02-02 19:21:38,887] {taskinstance.py:1017} INFO - 
--------------------------------------------------------------------------------
[2021-02-02 19:21:38,887] {taskinstance.py:1018} INFO - Starting attempt 1 of 1
[2021-02-02 19:21:38,887] {taskinstance.py:1019} INFO - 
--------------------------------------------------------------------------------
[2021-02-02 19:21:38,892] {taskinstance.py:1038} INFO - Executing <Task(SnowflakeOperator): snowflake_op_with_params> on 2021-02-02T13:51:18.229233+00:00
[2021-02-02 19:21:38,895] {standard_task_runner.py:51} INFO - Started process 16510 to run task
[2021-02-02 19:21:38,901] {standard_task_runner.py:75} INFO - Running: ['airflow', 'tasks', 'run', 'snowflake_test', 'snowflake_op_with_params', '2021-02-02T13:51:18.229233+00:00', '--job-id', '7', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/snowflake_test.py', '--cfg-path', '/var/folders/6h/1pzt4pbx6h32h6p5v503wws00000gp/T/tmp1w61m38s']
[2021-02-02 19:21:38,903] {standard_task_runner.py:76} INFO - Job 7: Subtask snowflake_op_with_params
[2021-02-02 19:21:38,933] {logging_mixin.py:103} INFO - Running <TaskInstance: snowflake_test.snowflake_op_with_params 2021-02-02T13:51:18.229233+00:00 [running]> on host 1.0.0.127.in-addr.arpa
[2021-02-02 19:21:38,954] {taskinstance.py:1232} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=snowflake_test
AIRFLOW_CTX_TASK_ID=snowflake_op_with_params
AIRFLOW_CTX_EXECUTION_DATE=2021-02-02T13:51:18.229233+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-02-02T13:51:18.229233+00:00
[2021-02-02 19:21:38,955] {snowflake.py:119} INFO - Executing: INSERT INTO TEST_TABLE VALUES ('name', %(id)s)
[2021-02-02 19:21:38,961] {base.py:74} INFO - Using connection to: id: snowflake_conn. Host: uva00063.us-east-1.snowflakecomputing.com, Port: None, Schema: , Login: aashay, Password: XXXXXXXX, extra: XXXXXXXX
[2021-02-02 19:21:38,963] {connection.py:218} INFO - Snowflake Connector for Python Version: 2.3.7, Python Version: 3.7.3, Platform: Darwin-19.5.0-x86_64-i386-64bit
[2021-02-02 19:21:38,964] {connection.py:769} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
[2021-02-02 19:21:38,964] {connection.py:785} INFO - Setting use_openssl_only mode to False
[2021-02-02 19:21:38,996] {local_task_job.py:118} INFO - Task exited with return code Negsignal.SIGABRT

apache-airflow==2.0.0

python 3.7.3

Looking forward for help with this. let me know I need to provide any more details wrt. code or airflow...................................???

Upvotes: 2

Views: 8324

Answers (5)

Kacper
Kacper

Reputation: 13

In case someone ends up here when Googling SIGBART or SIGSEGV error in airflow. You might find it useful, at least i would, before i wasted some time trying to find the source of the problem.

https://github.com/apache/airflow/discussions/22273#discussioncomment-5379147

Upvotes: 0

Mohamed Ajmal
Mohamed Ajmal

Reputation: 19

I think the issue you are facing has to do with OCSP checking

Add "insecure_mode=True" in snowflake connector to disable OCSP checking

snowflake.connector.connect(
   insecure_mode=True,
)

But "insecure_mode=True" is needed only when running inside Airflow (Running as a pyscript works fine) . I don't know why is that.

Note: export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES didn't work for me because i didn't get this error may have been in progress in another thread when fork() was called. which will usually throw Negsignal.SIGABRT exception

Upvotes: 0

Christian Brauchli
Christian Brauchli

Reputation: 31

This helped me too on Mac OSX:

  1. Open terminal an open .zshrc (or .bash_profile) with nano .zshrc
  2. Add this line to the profile export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
  3. Restart scheduler & web server.

Upvotes: 3

phil-in-code
phil-in-code

Reputation: 166

I ran into the same error on Mac OSX - which looks like the OS you are using based on the local file path.

Adding the following to my Airflow scheduler session fixed the problem:

export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES

The cause is described in the Airflow docs:

This error occurs because of added security to restrict multiprocessing & multithreading in Mac OS High Sierra and above.

The doc refers to version 1.10.x - but based on local test environments this also resolves the error for v2.x.x.

kaxil comment in the following issue actually guided me to the relevant Airflow page: https://github.com/apache/airflow/issues/12808#issuecomment-738854764

Upvotes: 3

Elad Kalif
Elad Kalif

Reputation: 15931

You are executing:

snowflake_op_with_params = SnowflakeOperator(
    task_id='snowflake_op_with_params',
    dag=dag,
    snowflake_conn_id=SNOWFLAKE_CONN_ID,
    sql=SQL_INSERT_STATEMENT,
    parameters={"id": 56},
    warehouse=SNOWFLAKE_WAREHOUSE,
    database=SNOWFLAKE_DATABASE,
 #   schema=SNOWFLAKE_SCHEMA,
    role=SNOWFLAKE_ROLE,
)

This try to run the sql in SQL_INSERT_STATEMENT. So it executes:

f"INSERT INTO {SNOWFLAKE_SAMPLE_TABLE} VALUES ('name', %(id)s)"

which gives:

INSERT INTO sample_table VALUES ('name', %(id)s)

As shown in your own log:

[2021-02-02 19:21:38,955] {snowflake.py:119} INFO - Executing: INSERT INTO TEST_TABLE VALUES ('name', %(id)s)

This is not a valid SQL statement.

I can't really tell what SQL you wanted to execute. Based on SQL_LIST I can assume that %(id)s suppose to be and id of integer type.

Upvotes: 1

Related Questions