Reputation: 103
I've PythonOperator which uses boto3 to connect to AWS and download files from S3. My Airflow cluster is setup locally. When I run DAG in DebugExecutor
mode everything seems to work correctly but in LocalExecutor
it's throwing following error. Is there a way to debug this as logs are not much useful.
--------------------------------------------------------------------------------
[2021-06-11 13:48:12,977] {taskinstance.py:867} INFO - Starting attempt 1 of 2
[2021-06-11 13:48:12,977] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2021-06-11 13:48:12,997] {taskinstance.py:887} INFO - Executing <Task(PythonOperator): download_from_s3> on 2016-11-01T00:00:00+00:00
[2021-06-11 13:48:13,000] {standard_task_runner.py:53} INFO - Started process 90339 to run task
[2021-06-11 13:48:13,102] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: download_dags.download_from_s3 2016-11-01T00:00:00+00:00 [running]>
[2021-06-11 13:48:13,137] {logging_mixin.py:112} INFO - [2021-06-11 13:48:13,137] {sync_dag.py:48} INFO - Starting download
[2021-06-11 13:48:22,924] {logging_mixin.py:112} INFO - [2021-06-11 13:48:22,923] {local_task_job.py:103} INFO - Task exited with return code Negsignal.SIGABRT
[2021-06-11 13:52:29,212] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: download_dags.download_from_s3 2016-11-01T00:00:00+00:00 [queued]>
[2021-06-11 13:52:29,247] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: download_dags.download_from_s3 2016-11-01T00:00:00+00:00 [queued]>
[2021-06-11 13:52:29,247] {taskinstance.py:866} INFO -
--------------------------------------------------------------------------------
[2021-06-11 13:52:29,247] {taskinstance.py:867} INFO - Starting attempt 2 of 2
[2021-06-11 13:52:29,247] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2021-06-11 13:52:29,253] {taskinstance.py:887} INFO - Executing <Task(PythonOperator): download_from_s3> on 2016-11-01T00:00:00+00:00
[2021-06-11 13:52:29,256] {standard_task_runner.py:53} INFO - Started process 92685 to run task
[2021-06-11 13:52:29,336] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: download_dags.download_from_s3 2016-11-01T00:00:00+00:00 [running]>
[2021-06-11 13:52:29,370] {logging_mixin.py:112} INFO - [2021-06-11 13:52:29,369] {sync_dag.py:50} INFO - Starting download
[2021-06-11 13:52:39,191] {logging_mixin.py:112} INFO - [2021-06-11 13:52:39,190] {local_task_job.py:103} INFO - Task exited with return code Negsignal.SIGABRT
def download_directory_from_s3(bucket_name, remote_directory_name, path_to_download=None):
try:
logging.info("Starting download ")
s3_resource = boto3.resource('s3',
aws_access_key_id=os.getenv("ACCESS_KEY"),
aws_secret_access_key=os.getenv("SECRET_KEY"))
bucket = s3_resource.Bucket(bucket_name)
logging.info("Authentication success")
for obj in bucket.objects.filter(Prefix=remote_directory_name):
if not os.path.exists(os.path.dirname(os.path.join(path_to_download, obj.key))):
os.makedirs(os.path.dirname(os.path.join(path_to_download, obj.key)))
bucket.download_file(obj.key, os.path.join(path_to_download, obj.key))
except Exception as es:
logging.exception("Exception :", str(es))
return "DownloadSuccess"
with DAG(dag_id='download_dags', default_args=default_args, schedule_interval='@once') as dag:
download_kwargs = {
'bucket_name': 'bucket_name',
'remote_directory_name': 'path',
'path_to_download': 'local_path'
}
step1 = PythonOperator(
task_id='download_from_s3',
python_callable=download_directory_from_s3,
op_kwargs=download_kwargs
)
Upvotes: 2
Views: 979
Reputation: 15979
You are probably running it on MacOS. Set:
$ export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
should fix the issue (reference).
Upvotes: 1