Guido Pepper
Guido Pepper

Reputation: 171

Airflow 1.9 - Cannot get logs to write to s3

I'm running airflow 1.9 in kubernetes in aws. I would like the logs to go to s3 as the airflow containers themselves are not long lived.

I've read the various threads and documents which describe the process but I still cannot get it working. First a test that demonstrates to me that the s3 configuration and permissions are valid. This is run on one of our worker instances.

Use airflow to write to an s3 file

airflow@airflow-worker-847c66d478-lbcn2:~$ id
uid=1000(airflow) gid=1000(airflow) groups=1000(airflow)
airflow@airflow-worker-847c66d478-lbcn2:~$ env |grep s3
AIRFLOW__CONN__S3_LOGS=s3://vevo-dev-us-east-1-services-airflow/logs/
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logs
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://vevo-dev-us-east-1-services-airflow/logs/
airflow@airflow-worker-847c66d478-lbcn2:~$ python
Python 3.6.4 (default, Dec 21 2017, 01:37:56)
[GCC 4.9.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import airflow
>>> s3 = airflow.hooks.S3Hook('s3_logs')
/usr/local/lib/python3.6/site-packages/airflow/utils/helpers.py:351: DeprecationWarning: Importing S3Hook directly from <module 'airflow.hooks' from '/usr/local/lib/python3.6/site-packages/airflow/hooks/__init__.py'> has been deprecated. Please import from '<module 'airflow.hooks' from '/usr/local/lib/python3.6/site-packages/airflow/hooks/__init__.py'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
  DeprecationWarning)
>>> s3.load_string('put this in s3 file', airflow.conf.get('core', 'remote_base_log_folder') + "/airflow-test")
[2018-02-23 18:43:58,437] {{base_hook.py:80}} INFO - Using connection to: vevo-dev-us-east-1-services-airflow

Now let's retrieve the file from s3 and look at the contents. We can see everything looks good here.

root@4f8171d4fe47:/# aws s3 cp s3://vevo-dev-us-east-1-services-airflow/logs//airflow-test .
download: s3://vevo-dev-us-east-1-services-airflow/logs//airflow-test to ./airflow-test
root@4f8171d4fe47:/# cat airflow-test
put this in s3 fileroot@4f8171d4fe47:/stringer#

So it seems like the airflow s3 connection is good except airflow jobs do not use s3 for logging. Here are the settings I have which I figure something is either wrong or I am missing something.

Env vars of running worker/scheduler/master instances are

airflow@airflow-worker-847c66d478-lbcn2:~$ env |grep -i s3
AIRFLOW__CONN__S3_LOGS=s3://vevo-dev-us-east-1-services-airflow/logs/
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logs
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://vevo-dev-us-east-1-services-airflow/logs/
S3_BUCKET=vevo-dev-us-east-1-services-airflow

This shows that the s3_logs connection exists in airflow

airflow@airflow-worker-847c66d478-lbcn2:~$ airflow connections -l|grep s3
│ 's3_logs'              │ 's3'                    │ 'vevo-dev-
us-...vices-airflow' │ None   │ False          │ False                │ None                           │

I put this file https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py in place in my docker image. You can see an example here on one of our workers

airflow@airflow-worker-847c66d478-lbcn2:~$ ls -al /usr/local/airflow/config/
total 32
drwxr-xr-x. 2 root    root    4096 Feb 23 00:39 .
drwxr-xr-x. 1 airflow airflow 4096 Feb 23 00:53 ..
-rw-r--r--. 1 root    root    4471 Feb 23 00:25 airflow_local_settings.py
-rw-r--r--. 1 root    root       0 Feb 16 21:35 __init__.py

We have edited the file to define the REMOTE_BASE_LOG_FOLDER variable. Here is the diff between our version and the upstream version

index 899e815..897d2fd 100644
--- a/var/tmp/file
+++ b/config/airflow_local_settings.py
@@ -35,7 +35,8 @@ PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
 # Storage bucket url for remote logging
 # s3 buckets should start with "s3://"
 # gcs buckets should start with "gs://"
-REMOTE_BASE_LOG_FOLDER = ''
+REMOTE_BASE_LOG_FOLDER = conf.get('core', 'remote_base_log_folder')
+

 DEFAULT_LOGGING_CONFIG = {
     'version': 1,

Here you can see that the setting is correct on one of our workers.

>>> import airflow
>>> airflow.conf.get('core', 'remote_base_log_folder')
's3://vevo-dev-us-east-1-services-airflow/logs/'

Based on the fact that REMOTE_BASE_LOG_FOLDER starts with 's3' and REMOTE_LOGGING is True

>>> airflow.conf.get('core', 'remote_logging')
'True'

I would expect this block https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py#L122-L123 to evaluate to true and make the logs go to s3.

Please can anyone who has s3 logging working on 1.9 point out what I am missing? I would like to submit a PR to the upstream project to update the docs as this seems to be a pretty common problem and as near as I can tell the upstream documents are not valid or somehow get misinterpreted frequently.

Thanks! G.

Upvotes: 2

Views: 2902

Answers (2)

LiorH
LiorH

Reputation: 18824

when deploying to k8 with the official helm chart I had to add the remote logging config also to the worker pods. So this wasn't enough:

  AIRFLOW__CORE__REMOTE_LOGGING: True
  AIRFLOW__CORE__REMOTE_LOG_CONN_ID: s3_logs
  AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: 's3://my-log-bucket/logs'

I also had to pass these vars to the workers

  AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__REMOTE_LOGGING: True
  AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__REMOTE_LOG_CONN_ID: s3_logs
  AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: 's3://my-log-bucket/logs'

Upvotes: 0

Josef Joe Samanek
Josef Joe Samanek

Reputation: 1704

Yea, I also had trouble setting it up based just on the docs. I had to go over airflow's code to figure it out. There are multiple things you could have not done.

Some things to check:
1. Make sure you have the log_config.py file and it is in the correct dir: ./config/log_config.py. Also make sure you didn't forget the __init__.py file in that dir.
2. Make sure you defined the s3.task handler and set its formatter to airflow.task
3. Make sure you set airflow.task and airflow.task_runner handlers to s3.task

Here is a log_config.py file that works for me:

# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from airflow import configuration as conf

# TO DO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
# there are other log format and level configurations in
# settings.py and cli.py. Please see AIRFLOW-1455.

LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

S3_LOG_FOLDER = 's3://your_path_to_airflow_logs'

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
        # When using s3 or gcs, provide a customized LOGGING_CONFIG
        # in airflow_local_settings within your PYTHONPATH, see UPDATING.md
        # for details
        's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': S3_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
        # 'gcs.task': {
        #     'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
        #     'formatter': 'airflow.task',
        #     'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        #     'gcs_log_folder': GCS_LOG_FOLDER,
        #     'filename_template': FILENAME_TEMPLATE,
        # },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}

Upvotes: 3

Related Questions