Mojo
Mojo

Reputation: 1202

Why Logging is Not Working for Akka Stream

I am using Alpakka and have toy example below:

val system = ActorSystem("system")
implicit val materializer: ActorMaterializer = ActorMaterializer.create(system)

implicit val adapter: LoggingAdapter = Logging(system, "customLogger")
implicit val ec: ExecutionContextExecutor = system.dispatcher

val log = Logger(this.getClass, "Foo")

val consumerConfig = system.settings.config.getConfig("akka.kafka.consumer")
val consumerSettings: ConsumerSettings[String, String] =
  ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("my-group")

def start() = {
  Consumer.plainSource(consumerSettings, Subscriptions.topics("test"))
    .log("My Consumer: ")
    .withAttributes(
      Attributes.logLevels(
        onElement = Logging.InfoLevel,
        onFinish = Logging.InfoLevel,
        onFailure = Logging.DebugLevel
      )
    )
    .filter(//some predicate)
    .map(// some process)
    .map(out => ByteString(out))
    .runWith(LogRotatorSink(timeFunc))
    .onComplete {
      case Success(_) => log.info("DONE")
      case Failure(e) => log.error("ERROR")
    }
}

This code is working. But I having problem with logging. The first part with attributes is logging fine. When element coming in, it makes log to standard output. But when LogRotatorSink is finishing and future is completing I want to print DONE to standard output. This is not working. File is being produced so process is working but no "DONE" messaging to standard output.

How I get "DONE" to standard output please?

akka {

  # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
  # to STDOUT)
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # Log level used by the configured loggers (see "loggers") as soon
  # as they have been started; before that, see "stdout-loglevel"
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  loglevel = "INFO"

  # Log level for the very basic logger activated during ActorSystem startup.
  # This logger prints the log messages to stdout (System.out).
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  stdout-loglevel = "INFO"

  # Filter of log events that is used by the LoggingAdapter before
  # publishing log events to the eventStream.
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

}


<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%highlight(%date{HH:mm:ss.SSS} %-5level %-50.50([%logger{50}])) - %msg%n</pattern>
        </encoder>
    </appender>

    <logger name="org.apache.kafka" level="INFO"/>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>

</configuration>

Upvotes: 0

Views: 2438

Answers (1)

Mateusz Kubuszok
Mateusz Kubuszok

Reputation: 27595

Log is working - it's your Future which is not ending, because Kafka Consumer is an infinite stream - when it will read everything and reach the newest messages in topic... it will wait for new messages to appear - in many cases e.g. evens sourcing closing such stream out of the blue would be a disaster, so infinitely running stream as a default is the sane choice.

When should this stream actually end? Define this condition clearly and you will be able to use something like .take(n), .takeUntil(cond), .takeWithin(time) to close it on explicitly defined conditions. Then stream will close, Future will complete and your DONE will get printed.

Upvotes: 3

Related Questions