gurghet
gurghet

Reputation: 7707

How to minimize memory usage in akka streams

I have stream that at some point will group objects to create files. I think I can squeeze some bytes by serializing the object early in the stream. But my biggest question is about how to optimize the memory footprint for a stream like this:

val sourceOfCustomer = Source.repeat(Customer(name = "test"))
def serializeCustomer(customer: Customer) = customer.toString

sourceOfCustomers
.via(serializeCustomer) // 1KB
.grouped(1000000) // 1GB
.via(processFile) // 1GB
.via(moreProcessing) // 1GB
.via(evenMoreProcessing) // 1GB
.to(fileSink) // 1GB

This gives me a memory usage at steady state of at least 5GB. Is this correct?

What strategy can I use to only limit it to 1 or 2GB? In principle it should be possible by collapsing the operators.

Note: I know a solution is to make the group smaller but let’s consider the size of the group a constraint of the problem.

Upvotes: 2

Views: 727

Answers (1)

Ivan Kurchenko
Ivan Kurchenko

Reputation: 4063

Sorry, maybe I'm missing something, but I did not find group operation in latest Akka Stream documentation, I guess you mean grouped operation: https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/grouped.html .

If so, then it means that at .grouped(1000000) // 1GB you create group of elements in the stream , which can be handled simultaneously, hence one more then one group of 1GB size can present in memory in one moment of time. So in order to limit memory footprint in your stream up to 1GB, you can go with one of two ways:

1) Reduce number of large groups handled simultaneously. This can be achieved with throttle operation: https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/throttle.html#throttle Please, see code snippet for example

import scala.concurrent.duration._
...

.group(1000000) // 1GB
.throttle(1, 1 minute)

2) Reducing large group size

val parallelismLevel = Runtime.getRuntime.availableProcessors() // or another custom level which represents stream processing parallelism
val baseGroupSize = 1000000 // 1GB
val groupSize =  baseGroupSize / parallelismLevel 

sourceOfCustomers
.via(serializeCustomer) // 1KB
.group(groupSize)

Hope this helps!

Upvotes: 4

Related Questions