Reputation: 1
I've been trying to send logs to a localstack Docker container inside a Dagster Unit/Integration Test, with the following configurations:
For the custom logger:
@logger(
{
"name": Field(str, is_required=False, default_value="dagster"),
"log_level": Field(str, is_required=False, default_value="DEBUG"),
"log_group_name": Field(str, description="The name of the log group"),
"log_stream_name": Field(
str, description="The prefix of the log stream name"
),
},
description="A CloudWatch console logger.",
)
def cloudwatch_logger(init_context: "InitLoggerContext"):
"""Logger that emits dagster logs to AWS CloudWatch.
Parameters
----------
init_context : InitLoggerContext
Returns
-------
logging.Logger
"""
level = coerce_valid_log_level(init_context.logger_config["log_level"])
name = init_context.logger_config["name"]
logger_class = logging.getLoggerClass()
logger = logger_class(name, level=level)
handler = CloudwatchLogsHandler(
init_context.logger_config["log_group_name"],
init_context.logger_config["log_stream_name"],
level=level,
)
handler.setFormatter(JsonLogFormatter())
logger.addHandler(handler)
return logger
The conftest.py file:
import pytest
import subprocess
import os
import boto3
import logging
from enum import IntEnum
@pytest.fixture(scope="session", autouse=True)
def initialize_localstack():
subprocess.run(
[
"docker-compose",
"up",
"localstack-service",
"-d",
"--build"
]
)
yield
subprocess.run(
[
"docker",
"rm",
"-f",
"localstack-service"
]
)
@pytest.fixture(scope="session", autouse=False)
def cloud_client(initialize_localstack):
# TODO: Obtain the environment variables using the 'settings' module.
cloudwatch_client = boto3.client(
"logs",
endpoint_url=os.environ.get("AWS__ENDPOINT_URL"),
region_name=os.environ.get("AWS__REGION_NAME"),
)
LOG_GROUP = "dagster-test-log-group"
LOG_STREAM = "dagster-test-log-stream"
try:
cloudwatch_client.create_log_group(logGroupName=LOG_GROUP)
cloudwatch_client.create_log_stream(
logGroupName=LOG_GROUP,
logStreamName=LOG_STREAM
)
except cloudwatch_client.exceptions.ResourceAlreadyExistsException:
pass
yield cloudwatch_client
cloudwatch_client.delete_log_stream(
logGroupName=LOG_GROUP,
logStreamName=LOG_STREAM
)
cloudwatch_client.delete_log_group(
logGroupName=LOG_GROUP
)
The tests.py file: def test_dummy_op(cloud_client): context = build_init_logger_context( logger_config={ "name": "dagster-logger", "log_level": "DEBUG", "log_group_name": "dagster-test-log-group", "log_stream_name": "dagster-test-log-stream", } )
test_logger = cloudwatch_logger(context)
test_logger.info("Hello from inside the test!")
The strange part in this flow is that, when I run the Dagster Server and trigger and op with this logger configuration, the logs are sent to cloudwatch successfully, but when I try to test these logs inside the previous unit test, the command:
awslocal logs filter-log-events --log-group-name dagster-test-log-group
Returns zero events:
{
"events": [],
"searchedLogStreams": [
{
"logStreamName": "dagster-test-log-stream",
"searchedCompletely": true
},
{
"logStreamName": "dagster-test-log-stream/323032352d30312d30382d3136",
"searchedCompletely": true
}
]
}```
What's happening in here?
Upvotes: 0
Views: 27