Reputation: 122
Running this DAG in airflow gives error as Task exited with return code Negsignal.SIGABRT.
I am not sure what is wrong I have done
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.dates import days_ago
SNOWFLAKE_CONN_ID = 'snowflake_conn'
# TODO: should be able to rely on connection's schema, but currently param required by S3ToSnowflakeTransfer
# SNOWFLAKE_SCHEMA = 'schema_name'
#SNOWFLAKE_STAGE = 'stage_name'
SNOWFLAKE_WAREHOUSE = 'SF_TUTS_WH'
SNOWFLAKE_DATABASE = 'KAFKA_DB'
SNOWFLAKE_ROLE = 'sysadmin'
SNOWFLAKE_SAMPLE_TABLE = 'sample_table'
CREATE_TABLE_SQL_STRING = (
f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE} (name VARCHAR(250), id INT);"
)
SQL_INSERT_STATEMENT = f"INSERT INTO {SNOWFLAKE_SAMPLE_TABLE} VALUES ('name', %(id)s)"
SQL_LIST = [SQL_INSERT_STATEMENT % {"id": n} for n in range(0, 10)]
default_args = {
'owner': 'airflow',
}
dag = DAG(
'example_snowflake',
default_args=default_args,
start_date=days_ago(2),
tags=['example'],
)
snowflake_op_sql_str = SnowflakeOperator(
task_id='snowflake_op_sql_str',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql=CREATE_TABLE_SQL_STRING,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
# schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
)
snowflake_op_with_params = SnowflakeOperator(
task_id='snowflake_op_with_params',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql=SQL_INSERT_STATEMENT,
parameters={"id": 56},
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
# schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
)
snowflake_op_sql_list = SnowflakeOperator(
task_id='snowflake_op_sql_list', dag=dag, snowflake_conn_id=SNOWFLAKE_CONN_ID, sql=SQL_LIST
)
snowflake_op_sql_str >> [
snowflake_op_with_params,
snowflake_op_sql_list,]
Getting LOGS in airFlow as below ::
Reading local file: /Users/aashayjain/airflow/logs/snowflake_test/snowflake_op_with_params/2021-02-02T13:51:18.229233+00:00/1.log
[2021-02-02 19:21:38,880] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: snowflake_test.snowflake_op_with_params 2021-02-02T13:51:18.229233+00:00 [queued]>
[2021-02-02 19:21:38,887] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: snowflake_test.snowflake_op_with_params 2021-02-02T13:51:18.229233+00:00 [queued]>
[2021-02-02 19:21:38,887] {taskinstance.py:1017} INFO -
--------------------------------------------------------------------------------
[2021-02-02 19:21:38,887] {taskinstance.py:1018} INFO - Starting attempt 1 of 1
[2021-02-02 19:21:38,887] {taskinstance.py:1019} INFO -
--------------------------------------------------------------------------------
[2021-02-02 19:21:38,892] {taskinstance.py:1038} INFO - Executing <Task(SnowflakeOperator): snowflake_op_with_params> on 2021-02-02T13:51:18.229233+00:00
[2021-02-02 19:21:38,895] {standard_task_runner.py:51} INFO - Started process 16510 to run task
[2021-02-02 19:21:38,901] {standard_task_runner.py:75} INFO - Running: ['airflow', 'tasks', 'run', 'snowflake_test', 'snowflake_op_with_params', '2021-02-02T13:51:18.229233+00:00', '--job-id', '7', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/snowflake_test.py', '--cfg-path', '/var/folders/6h/1pzt4pbx6h32h6p5v503wws00000gp/T/tmp1w61m38s']
[2021-02-02 19:21:38,903] {standard_task_runner.py:76} INFO - Job 7: Subtask snowflake_op_with_params
[2021-02-02 19:21:38,933] {logging_mixin.py:103} INFO - Running <TaskInstance: snowflake_test.snowflake_op_with_params 2021-02-02T13:51:18.229233+00:00 [running]> on host 1.0.0.127.in-addr.arpa
[2021-02-02 19:21:38,954] {taskinstance.py:1232} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=snowflake_test
AIRFLOW_CTX_TASK_ID=snowflake_op_with_params
AIRFLOW_CTX_EXECUTION_DATE=2021-02-02T13:51:18.229233+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-02-02T13:51:18.229233+00:00
[2021-02-02 19:21:38,955] {snowflake.py:119} INFO - Executing: INSERT INTO TEST_TABLE VALUES ('name', %(id)s)
[2021-02-02 19:21:38,961] {base.py:74} INFO - Using connection to: id: snowflake_conn. Host: uva00063.us-east-1.snowflakecomputing.com, Port: None, Schema: , Login: aashay, Password: XXXXXXXX, extra: XXXXXXXX
[2021-02-02 19:21:38,963] {connection.py:218} INFO - Snowflake Connector for Python Version: 2.3.7, Python Version: 3.7.3, Platform: Darwin-19.5.0-x86_64-i386-64bit
[2021-02-02 19:21:38,964] {connection.py:769} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
[2021-02-02 19:21:38,964] {connection.py:785} INFO - Setting use_openssl_only mode to False
[2021-02-02 19:21:38,996] {local_task_job.py:118} INFO - Task exited with return code Negsignal.SIGABRT
apache-airflow==2.0.0
python 3.7.3
Looking forward for help with this. let me know I need to provide any more details wrt. code or airflow...................................???
Upvotes: 2
Views: 8324
Reputation: 13
In case someone ends up here when Googling SIGBART or SIGSEGV error in airflow. You might find it useful, at least i would, before i wasted some time trying to find the source of the problem.
https://github.com/apache/airflow/discussions/22273#discussioncomment-5379147
Upvotes: 0
Reputation: 19
I think the issue you are facing has to do with OCSP checking
Add "insecure_mode=True" in snowflake connector to disable OCSP checking
snowflake.connector.connect(
insecure_mode=True,
)
But "insecure_mode=True" is needed only when running inside Airflow (Running as a pyscript works fine) . I don't know why is that.
Note: export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
didn't work for me because i didn't get this error
may have been in progress in another thread when fork() was called.
which will usually throw Negsignal.SIGABRT
exception
Upvotes: 0
Reputation: 31
This helped me too on Mac OSX:
nano .zshrc
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
Upvotes: 3
Reputation: 166
I ran into the same error on Mac OSX - which looks like the OS you are using based on the local file path.
Adding the following to my Airflow scheduler session fixed the problem:
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
The cause is described in the Airflow docs:
This error occurs because of added security to restrict multiprocessing & multithreading in Mac OS High Sierra and above.
The doc refers to version 1.10.x - but based on local test environments this also resolves the error for v2.x.x.
kaxil comment in the following issue actually guided me to the relevant Airflow page: https://github.com/apache/airflow/issues/12808#issuecomment-738854764
Upvotes: 3
Reputation: 15931
You are executing:
snowflake_op_with_params = SnowflakeOperator(
task_id='snowflake_op_with_params',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql=SQL_INSERT_STATEMENT,
parameters={"id": 56},
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
# schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
)
This try to run the sql
in SQL_INSERT_STATEMENT
.
So it executes:
f"INSERT INTO {SNOWFLAKE_SAMPLE_TABLE} VALUES ('name', %(id)s)"
which gives:
INSERT INTO sample_table VALUES ('name', %(id)s)
As shown in your own log:
[2021-02-02 19:21:38,955] {snowflake.py:119} INFO - Executing: INSERT INTO TEST_TABLE VALUES ('name', %(id)s)
This is not a valid SQL statement.
I can't really tell what SQL you wanted to execute. Based on SQL_LIST
I can assume that %(id)s
suppose to be and id of integer type.
Upvotes: 1