Toaditoad
Toaditoad

Reputation: 304

Running Akka Streams stages in parallel dramatically increases memory pressure

I'm trying to implement an Akka Stream that reads frames from a video file and applies a SVM Classifier in order to detect objects on each frame. The detection can run in parallel because the order of the video frames does not matter. My idea is to create a graph that follows the Akka Streams Cookbook (Balancing jobs to a fixed pool of workers) having two detection stages marked as .async.

It works to a certain extent as expected but I noticed that the memory pressure of my system (only 8 GB available) dramatically increases and is off-the-charts slowing down the system significantly. Comparing this with a different approach that uses .mapAsync (Akka Docs) integrating even three actors into the stream performing the object detection, the memory pressure is significantly lower.

What am I missing? Why does running two stages in parallel increase the memory pressure while three parallel running actors seem to work fine?

Additional remarks: I'm using OpenCV for reading the video file. Due to the 4K resolution, each video frame of type Mat is about 26.5 MB.

Running two stages in parallel with .async dramatically increasing memory pressure

implicit val materializer = ActorMaterializer(
  ActorMaterializerSettings(actorSystem)
    .withInputBuffer(initialSize = 1, maxSize = 1)
    .withOutputBurstLimit(1)
    .withSyncProcessingLimit(2)
  )

val greyscaleConversion: Flow[Frame, Frame, NotUsed] =
  Flow[Frame].map { el => Frame(el.videoPos, FrameTransformation.transformToGreyscale(el.frame)) }

val objectDetection: Flow[Frame, DetectedObjectPos, NotUsed] =
  Flow.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val numberOfDetectors = 2
    val frameBalance: UniformFanOutShape[Frame, Frame] = builder.add(Balance[Frame](numberOfDetectors, waitForAllDownstreams = true))
    val detectionMerge: UniformFanInShape[DetectedObjectPos, DetectedObjectPos] = builder.add(Merge[DetectedObjectPos](numberOfDetectors))

    for (i <- 0 until numberOfDetectors) {
      val detectionFlow: Flow[Frame, DetectedObjectPos, NotUsed] = Flow[Frame].map { greyFrame =>
        val classifier = new CascadeClassifier()
        classifier.load("classifier.xml")
        val detectedObjects: MatOfRect = new MatOfRect()
        classifier.detectMultiScale(greyFrame.frame, detectedObjects, 1.08, 5, 0 | Objdetect.CASCADE_SCALE_IMAGE, new Size(40, 20), new Size(100, 80))
        DetectedObjectPos(greyFrame.videoPos, detectedObjects)
      }

      frameBalance.out(i) ~> detectionFlow.async ~> detectionMerge.in(i)
    }

    FlowShape(frameBalance.in, detectionMerge.out)
  })

def createGraph(videoFile: Video): RunnableGraph[NotUsed] = {
  Source.fromGraph(new VideoSource(videoFile))
    .via(greyscaleConversion).async
    .via(objectDetection)
    .to(Sink.foreach(detectionDisplayActor !))
}

Integrating actors with .mapAsync not increasing memory pressure

val greyscaleConversion: Flow[Frame, Frame, NotUsed] =
  Flow[Frame].map { el => Frame(el.videoPos, FrameTransformation.transformToGreyscale(el.frame)) }

val detectionRouter: ActorRef =
  actorSystem.actorOf(RandomPool(numberOfDetectors).props(Props[DetectionActor]), "detectionRouter")

val detectionFlow: Flow[Frame, DetectedObjectPos, NotUsed] =
  Flow[Frame].mapAsyncUnordered(parallelism = 3)(el => (detectionRouter ? el).mapTo[DetectedObjectPos])

def createGraph(videoFile: Video): RunnableGraph[NotUsed] = {
  Source.fromGraph(new VideoSource(videoFile))
    .via(greyscaleConversion)
    .via(detectionFlow)
    .to(Sink.foreach(detectionDisplayActor !))
}

Upvotes: 2

Views: 365

Answers (0)

Related Questions