Reputation: 171
I'm running airflow 1.9 in kubernetes in aws. I would like the logs to go to s3 as the airflow containers themselves are not long lived.
I've read the various threads and documents which describe the process but I still cannot get it working. First a test that demonstrates to me that the s3 configuration and permissions are valid. This is run on one of our worker instances.
Use airflow to write to an s3 file
airflow@airflow-worker-847c66d478-lbcn2:~$ id
uid=1000(airflow) gid=1000(airflow) groups=1000(airflow)
airflow@airflow-worker-847c66d478-lbcn2:~$ env |grep s3
AIRFLOW__CONN__S3_LOGS=s3://vevo-dev-us-east-1-services-airflow/logs/
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logs
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://vevo-dev-us-east-1-services-airflow/logs/
airflow@airflow-worker-847c66d478-lbcn2:~$ python
Python 3.6.4 (default, Dec 21 2017, 01:37:56)
[GCC 4.9.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import airflow
>>> s3 = airflow.hooks.S3Hook('s3_logs')
/usr/local/lib/python3.6/site-packages/airflow/utils/helpers.py:351: DeprecationWarning: Importing S3Hook directly from <module 'airflow.hooks' from '/usr/local/lib/python3.6/site-packages/airflow/hooks/__init__.py'> has been deprecated. Please import from '<module 'airflow.hooks' from '/usr/local/lib/python3.6/site-packages/airflow/hooks/__init__.py'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
DeprecationWarning)
>>> s3.load_string('put this in s3 file', airflow.conf.get('core', 'remote_base_log_folder') + "/airflow-test")
[2018-02-23 18:43:58,437] {{base_hook.py:80}} INFO - Using connection to: vevo-dev-us-east-1-services-airflow
Now let's retrieve the file from s3 and look at the contents. We can see everything looks good here.
root@4f8171d4fe47:/# aws s3 cp s3://vevo-dev-us-east-1-services-airflow/logs//airflow-test .
download: s3://vevo-dev-us-east-1-services-airflow/logs//airflow-test to ./airflow-test
root@4f8171d4fe47:/# cat airflow-test
put this in s3 fileroot@4f8171d4fe47:/stringer#
So it seems like the airflow s3 connection is good except airflow jobs do not use s3 for logging. Here are the settings I have which I figure something is either wrong or I am missing something.
Env vars of running worker/scheduler/master instances are
airflow@airflow-worker-847c66d478-lbcn2:~$ env |grep -i s3
AIRFLOW__CONN__S3_LOGS=s3://vevo-dev-us-east-1-services-airflow/logs/
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logs
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://vevo-dev-us-east-1-services-airflow/logs/
S3_BUCKET=vevo-dev-us-east-1-services-airflow
This shows that the s3_logs connection exists in airflow
airflow@airflow-worker-847c66d478-lbcn2:~$ airflow connections -l|grep s3
│ 's3_logs' │ 's3' │ 'vevo-dev-
us-...vices-airflow' │ None │ False │ False │ None │
I put this file https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py in place in my docker image. You can see an example here on one of our workers
airflow@airflow-worker-847c66d478-lbcn2:~$ ls -al /usr/local/airflow/config/
total 32
drwxr-xr-x. 2 root root 4096 Feb 23 00:39 .
drwxr-xr-x. 1 airflow airflow 4096 Feb 23 00:53 ..
-rw-r--r--. 1 root root 4471 Feb 23 00:25 airflow_local_settings.py
-rw-r--r--. 1 root root 0 Feb 16 21:35 __init__.py
We have edited the file to define the REMOTE_BASE_LOG_FOLDER variable. Here is the diff between our version and the upstream version
index 899e815..897d2fd 100644
--- a/var/tmp/file
+++ b/config/airflow_local_settings.py
@@ -35,7 +35,8 @@ PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
# Storage bucket url for remote logging
# s3 buckets should start with "s3://"
# gcs buckets should start with "gs://"
-REMOTE_BASE_LOG_FOLDER = ''
+REMOTE_BASE_LOG_FOLDER = conf.get('core', 'remote_base_log_folder')
+
DEFAULT_LOGGING_CONFIG = {
'version': 1,
Here you can see that the setting is correct on one of our workers.
>>> import airflow
>>> airflow.conf.get('core', 'remote_base_log_folder')
's3://vevo-dev-us-east-1-services-airflow/logs/'
Based on the fact that REMOTE_BASE_LOG_FOLDER starts with 's3' and REMOTE_LOGGING is True
>>> airflow.conf.get('core', 'remote_logging')
'True'
I would expect this block https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py#L122-L123 to evaluate to true and make the logs go to s3.
Please can anyone who has s3 logging working on 1.9 point out what I am missing? I would like to submit a PR to the upstream project to update the docs as this seems to be a pretty common problem and as near as I can tell the upstream documents are not valid or somehow get misinterpreted frequently.
Thanks! G.
Upvotes: 2
Views: 2902
Reputation: 18824
when deploying to k8 with the official helm chart I had to add the remote logging config also to the worker pods. So this wasn't enough:
AIRFLOW__CORE__REMOTE_LOGGING: True
AIRFLOW__CORE__REMOTE_LOG_CONN_ID: s3_logs
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: 's3://my-log-bucket/logs'
I also had to pass these vars to the workers
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__REMOTE_LOGGING: True
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__REMOTE_LOG_CONN_ID: s3_logs
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: 's3://my-log-bucket/logs'
Upvotes: 0
Reputation: 1704
Yea, I also had trouble setting it up based just on the docs. I had to go over airflow's code to figure it out. There are multiple things you could have not done.
Some things to check:
1. Make sure you have the log_config.py file and it is in the correct dir: ./config/log_config.py. Also make sure you didn't forget the __init__.py file in that dir.
2. Make sure you defined the s3.task handler and set its formatter to airflow.task
3. Make sure you set airflow.task and airflow.task_runner handlers to s3.task
Here is a log_config.py file that works for me:
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from airflow import configuration as conf
# TO DO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
# there are other log format and level configurations in
# settings.py and cli.py. Please see AIRFLOW-1455.
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
S3_LOG_FOLDER = 's3://your_path_to_airflow_logs'
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
# When using s3 or gcs, provide a customized LOGGING_CONFIG
# in airflow_local_settings within your PYTHONPATH, see UPDATING.md
# for details
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
# 'gcs.task': {
# 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
# 'formatter': 'airflow.task',
# 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
# 'gcs_log_folder': GCS_LOG_FOLDER,
# 'filename_template': FILENAME_TEMPLATE,
# },
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
Upvotes: 3