Reputation: 41
I try to perform splitting single Observable
in Monix by key, then group up to last n
events in every GrouppedObservable
and send them for further processing. The problem is that number of keys to group on is possibly infinite and that causes memory leaks.
Context of application:
I have kafka stream with messages from many conversations. Each conversation has roomId
and I want to group this id to get collection of Observables, each containing only messages from single conversation.
Conversation rooms are usually short lived, i.e. new conversation is created with unique roomId
, few dozens of messages is exchanged in short period of time, then conversation is closed.
To avoid memory leaks I want to keep buffers of only 100-1000 most recent conversation, and drop older ones. So if an event comes from a long unseen conversation, it will be treated as new conversation, because buffer with its previous messages will be forgotten.
groupBy method in Monix has argument keysBuffer
that specifies how to deal with key buffers.
I thought that specifying keyBuffer
to DropOld strategy will allow me to achieve behavior I wanted.
Below is simplified version of described use case.
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
import scala.concurrent.duration._
import scala.util.Random
case class Event(key: Key, value: String, seqNr: Int) {
override def toString: String = s"(k:$key;s:$seqNr)"
}
case class Key(conversationId: Int, messageNr: Int)
object Main {
def main(args: Array[String]): Unit = {
val fakeConsumer = Consumer.foreach(println)
val kafkaSimulator = Observable.interval(1.millisecond)
.map(n => generateHeavyEvent(n.toInt))
val groupedMessages = kafkaSimulator.groupBy(_.key)(OverflowStrategy.DropOld(50))
.mergeMap(slidingWindow)
groupedMessages.consumeWith(fakeConsumer).runSyncUnsafe()
}
def slidingWindow[T](source: Observable[T]): Observable[Seq[T]] =
source.scan(List.empty[T])(fixedSizeList)
def fixedSizeList[T](list: List[T], elem: T): List[T] =
(list :+ elem).takeRight(5)
def generateHeavyEvent(n: Int): Event = {
val conversationId: Int = n / 500
val messageNr: Int = n % 5
val key = Key(conversationId, messageNr)
val value = (1 to 1000).map(_ => Random.nextPrintableChar()).toString()
Event(key, value, n)
}
}
However, observing application heap on VisualVM indicates memory leak. After about 30 minutes of running, I got java.lang.OutOfMemoryError: GC overhead limit exceeded
Below are screenshot of heap usage plots depicting running my app for about 30 minutes. (Flattened part in the end is after OutOfMemoryError
)
VisualVM Heap plot of application
My question is: How can I group events in monix by possibly infinite number of keys without leaking memory? Old keys are allowed to be dropped
Background info:
3.0.0-RC2
2.12.8
Upvotes: 4
Views: 487
Reputation: 326
I have similar use case like yours, reading kafka stream and grouping by id.
What you want to do, is to timeout/cleanup the GrouppedObservable
when there is no demand. Otherwise, it will sit in memory forever. So, you can do something like this:
val eventsStream: Observable[Int] = ???
eventsStream
.groupBy(_ % 2 == 0)
.mergeMap {
_.mapEval(s => Task.delay(println(s)))
.timeoutOnSlowUpstreamTo(5.minutes, Observable.empty)
}
.completedL
Upvotes: 2