nologin
nologin

Reputation: 41

Monix Observable groupBy large number of keys without memory leaks

I try to perform splitting single Observable in Monix by key, then group up to last n events in every GrouppedObservable and send them for further processing. The problem is that number of keys to group on is possibly infinite and that causes memory leaks.

Context of application:

I have kafka stream with messages from many conversations. Each conversation has roomId and I want to group this id to get collection of Observables, each containing only messages from single conversation. Conversation rooms are usually short lived, i.e. new conversation is created with unique roomId, few dozens of messages is exchanged in short period of time, then conversation is closed. To avoid memory leaks I want to keep buffers of only 100-1000 most recent conversation, and drop older ones. So if an event comes from a long unseen conversation, it will be treated as new conversation, because buffer with its previous messages will be forgotten.

groupBy method in Monix has argument keysBuffer that specifies how to deal with key buffers.

I thought that specifying keyBuffer to DropOld strategy will allow me to achieve behavior I wanted.

Below is simplified version of described use case.

import monix.execution.Scheduler.Implicits.global
import monix.reactive._

import scala.concurrent.duration._
import scala.util.Random

case class Event(key: Key, value: String, seqNr: Int) {
  override def toString: String = s"(k:$key;s:$seqNr)"
}

case class Key(conversationId: Int, messageNr: Int)

object Main {
  def main(args: Array[String]): Unit = {

    val fakeConsumer = Consumer.foreach(println)
    val kafkaSimulator = Observable.interval(1.millisecond)
      .map(n => generateHeavyEvent(n.toInt))

    val groupedMessages = kafkaSimulator.groupBy(_.key)(OverflowStrategy.DropOld(50))
      .mergeMap(slidingWindow)

    groupedMessages.consumeWith(fakeConsumer).runSyncUnsafe()
  }

  def slidingWindow[T](source: Observable[T]): Observable[Seq[T]] =
    source.scan(List.empty[T])(fixedSizeList)

  def fixedSizeList[T](list: List[T], elem: T): List[T] =
    (list :+ elem).takeRight(5)

  def generateHeavyEvent(n: Int): Event = {
    val conversationId: Int = n / 500
    val messageNr: Int = n % 5
    val key = Key(conversationId, messageNr)
    val value = (1 to 1000).map(_ => Random.nextPrintableChar()).toString()
    Event(key, value, n)
  }

}

However, observing application heap on VisualVM indicates memory leak. After about 30 minutes of running, I got java.lang.OutOfMemoryError: GC overhead limit exceeded

Below are screenshot of heap usage plots depicting running my app for about 30 minutes. (Flattened part in the end is after OutOfMemoryError)

VisualVM Heap plot of application

My question is: How can I group events in monix by possibly infinite number of keys without leaking memory? Old keys are allowed to be dropped

Background info:

Upvotes: 4

Views: 487

Answers (1)

atl
atl

Reputation: 326

I have similar use case like yours, reading kafka stream and grouping by id.

What you want to do, is to timeout/cleanup the GrouppedObservable when there is no demand. Otherwise, it will sit in memory forever. So, you can do something like this:

val eventsStream: Observable[Int] = ???

eventsStream
  .groupBy(_ % 2 == 0)
  .mergeMap {
    _.mapEval(s => Task.delay(println(s)))
     .timeoutOnSlowUpstreamTo(5.minutes, Observable.empty)
  }
  .completedL

Upvotes: 2

Related Questions