Logger connected to CloudWatch doesn't work when trying to send logs using localstack inside a Dagster unit test

I've been trying to send logs to a localstack Docker container inside a Dagster Unit/Integration Test, with the following configurations:

For the custom logger:

@logger(
    {
        "name": Field(str, is_required=False, default_value="dagster"),
        "log_level": Field(str, is_required=False, default_value="DEBUG"),
        "log_group_name": Field(str, description="The name of the log group"),
        "log_stream_name": Field(
            str, description="The prefix of the log stream name"
        ),
    },
    description="A CloudWatch console logger.",
)
def cloudwatch_logger(init_context: "InitLoggerContext"):
    """Logger that emits dagster logs to AWS CloudWatch.

    Parameters
    ----------
    init_context : InitLoggerContext

    Returns
    -------
    logging.Logger
    """
    level = coerce_valid_log_level(init_context.logger_config["log_level"])
    name = init_context.logger_config["name"]
    logger_class = logging.getLoggerClass()
    logger = logger_class(name, level=level)
    handler = CloudwatchLogsHandler(
        init_context.logger_config["log_group_name"],
        init_context.logger_config["log_stream_name"],
        level=level,
    )
    handler.setFormatter(JsonLogFormatter())
    logger.addHandler(handler)
    return logger

The conftest.py file:

import pytest
import subprocess
import os
import boto3
import logging
from enum import IntEnum


@pytest.fixture(scope="session", autouse=True)
def initialize_localstack():
    subprocess.run(
        [
            "docker-compose",
            "up",
            "localstack-service",
            "-d",
            "--build"
        ]
    )
    
    yield
    
    subprocess.run(
        [   
            "docker",
            "rm",
            "-f",
            "localstack-service"
        ]
    )

@pytest.fixture(scope="session", autouse=False)
def cloud_client(initialize_localstack):
    # TODO: Obtain the environment variables using the 'settings' module.
    cloudwatch_client = boto3.client(
        "logs",
        endpoint_url=os.environ.get("AWS__ENDPOINT_URL"),
        region_name=os.environ.get("AWS__REGION_NAME"),
    )
    
    LOG_GROUP = "dagster-test-log-group"
    LOG_STREAM = "dagster-test-log-stream"
    
    try:
        cloudwatch_client.create_log_group(logGroupName=LOG_GROUP)
        cloudwatch_client.create_log_stream(
            logGroupName=LOG_GROUP,
            logStreamName=LOG_STREAM
        )
    except cloudwatch_client.exceptions.ResourceAlreadyExistsException:
        pass

    yield cloudwatch_client
    
    cloudwatch_client.delete_log_stream(
        logGroupName=LOG_GROUP,
        logStreamName=LOG_STREAM
    )
    
    cloudwatch_client.delete_log_group(
        logGroupName=LOG_GROUP
    )

The tests.py file: def test_dummy_op(cloud_client): context = build_init_logger_context( logger_config={ "name": "dagster-logger", "log_level": "DEBUG", "log_group_name": "dagster-test-log-group", "log_stream_name": "dagster-test-log-stream", } )

test_logger = cloudwatch_logger(context)
test_logger.info("Hello from inside the test!")

The strange part in this flow is that, when I run the Dagster Server and trigger and op with this logger configuration, the logs are sent to cloudwatch successfully, but when I try to test these logs inside the previous unit test, the command:

 awslocal logs filter-log-events --log-group-name dagster-test-log-group

Returns zero events:

{
    "events": [],
    "searchedLogStreams": [
        {
            "logStreamName": "dagster-test-log-stream",
            "searchedCompletely": true
        },
        {
            "logStreamName": "dagster-test-log-stream/323032352d30312d30382d3136",
            "searchedCompletely": true
        }
    ]
}```

What's happening in here?

Upvotes: 0

Views: 27

Answers (0)

Related Questions