Paul Velthuis
Paul Velthuis

Reputation: 335

Airflow Google Cloud Logging

For Apache Airflow v1.10 running in Python2.7, with `pip install airflow[gcp_api] I am trying to setup logging for the Google Cloud. I have the following log_config py file:

GCS_LOG_FOLDER = 'gs://GCSbucket/'
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()

FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()

LOG_FORMAT = conf.get('core', 'LOG_FORMAT')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')

PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')

FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')
PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')

# Storage bucket url for remote logging
# s3 buckets should start with "s3://"
# gcs buckets should start with "gs://"
# wasb buckets should start with "wasb"
# just to help Airflow select correct handler
 REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')

LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
    'airflow': {
        'format': LOG_FORMAT,
    },
},
'handlers': {
    'console': {
        'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
        'formatter': 'airflow',
        'stream': 'sys.stdout'
    },
    'task': {
        'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'filename_template': FILENAME_TEMPLATE,
    },
    'processor': {
        'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
        'filename_template': PROCESSOR_FILENAME_TEMPLATE,
    },
    # Add a GCSTaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    'gcs.task': {
        'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'gcs_log_folder': GCS_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
},
'loggers': {
    'airflow.processor': {
        'handlers': ['processor'],
        'level': LOG_LEVEL,
        'propagate': False,
    },
    'airflow.task': {
        'handlers': ['gcs.task'],
        'level': LOG_LEVEL,
        'propagate': False,
    },
    'airflow.task_runner': {
        'handlers': ['gcs.task'],
        'level': LOG_LEVEL,
        'propagate': True,
    },
    'flask_appbuilder': {
        'handler': ['console'],
        'level': FAB_LOG_LEVEL,
        'propagate': True,
    }
},
'root': {
    'handlers': ['console'],
    'level': LOG_LEVEL,
}
}

REMOTE_HANDLERS = {
's3': {
    'task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': REMOTE_BASE_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    'processor': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
        's3_log_folder': REMOTE_BASE_LOG_FOLDER,
        'filename_template': PROCESSOR_FILENAME_TEMPLATE,
    },
},
'gcs': {
    'task': {
        'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    'processor': {
        'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
        'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
        'filename_template': PROCESSOR_FILENAME_TEMPLATE,
    },
},
'wasb': {
    'task': {
        'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
        'wasb_container': 'airflow-logs',
        'filename_template': FILENAME_TEMPLATE,
        'delete_local_copy': False,
    },
    'processor': {
        'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
        'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
        'wasb_container': 'airflow-logs',
        'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        'delete_local_copy': False,
    },
}
}

REMOTE_LOGGING = conf.get('core', 'remote_logging')

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
 LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
 LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
 LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])

My airflow.cfg settings are:

[core]
remote_logging = True
remote_base_log_folder = gs:/GCSbucket/logs 
remote_log_conn_id = google_cloud_default 

The error I get is the following:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/local/lib/python2.7/logging/__init__.py", line 1676, in shutdown
    h.close()
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/log/gcs_task_handler.py", line 73, in close
    if self.closed:
AttributeError: 'GCSTaskHandler' object has no attribute 'closed'

Does anybody know what might have gone wrong? The tutorial that is being followed is: https://airflow.readthedocs.io/en/1.10.0/howto/write-logs.html

Update: Did some more research in the source code, here I see that the close statement returns nothing, and this is why my application crashes.

https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/utils/log/gcs_task_handler.py

Does somebody know why nothing is returend in

def close(self):
    if self.closed:
         return

Upvotes: 0

Views: 2846

Answers (2)

Paul Velthuis
Paul Velthuis

Reputation: 335

To resolve this question I added the following to the core of airflow.cfg

[core]
log_filename_template =  {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log

# Log format
# we need to escape the curly braces by adding an additional curly brace
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =log_config.LOGGING_CONFIG
task_log_reader = gcs.task

At the log_config.LOGGING_CONFIG I added the following:

    # Add a GCSTaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    'gcs.task': {
        'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'gcs_log_folder': GCS_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    }

The tutorial to be followed is: https://airflow.readthedocs.io/en/1.10.0/howto/write-logs.html

Upvotes: 1

kaxil
kaxil

Reputation: 18844

The instructions might be outdated. Please try with the instructions from the following link:

https://airflow.readthedocs.io/en/latest/howto/write-logs.html#writing-logs-to-google-cloud-storage

Follow the steps below to enable Google Cloud Storage logging.

To enable this feature, airflow.cfg must be configured as in this example:

[core]
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.
remote_logging = True
remote_base_log_folder = gs://my-bucket/path/to/logs
remote_log_conn_id = MyGCSConn
  1. Install the gcp_api package first, like so: pip install apache-airflow[gcp_api].
  2. Make sure a Google Cloud Platform connection hook has been defined in Airflow. The hook should have read and write access to the Google Cloud Storage bucket defined above in remote_base_log_folder.
  3. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.
  4. Verify that logs are showing up for newly executed tasks in the bucket you’ve defined.
  5. Verify that the Google Cloud Storage viewer is working in the UI. Pull up a newly executed task, and verify that you see something like:

-

*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
[2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
[2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
[2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
[2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py

Upvotes: 1

Related Questions