Toaditoad
Toaditoad

Reputation: 304

Akka Remoting: Custom Kryo serializer for org.opencv.core.Mat

I'm having trouble to implement a Kryo serializer for a org.opencv.core.Mat object representing e.g. a frame of a video file. The idea is that an Akka ActorSystem A sends a video frame in greyscale to a system B in order to detect objects in it (using akka-remote, not akka-cluster). For that purpose, the frame of type Mat has to be serialized and sent over the network. However, it seems that it is never actually sent and/or it stops the actor of sending heartbeats what causes the system to fail.

Any ideas what to do about it?

I also asked it on Github: https://github.com/romix/akka-kryo-serialization/issues/110

Related, but not answered: kryo serialization over storm

configuration.conf (on system A and B)

serializers {
  kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
}
serialization-bindings {
  "org.opencv.core.Mat" = kryo
}
kryo  {
  kryo-custom-serializer-init = "de.itd.util.KryoInit"
  type = "nograph"
  idstrategy = "explicit"
  buffer-size = 4096
  max-buffer-size = -1
  use-manifests = true
  use-unsafe = false
  post-serialization-transformations = "lz4"
  kryo-trace = true
  resolve-subclasses = false
}

de.itd.util.KryoInit.scala (on system A and B)

package de.itd.util

import com.esotericsoftware.kryo.Kryo
import org.opencv.core.Mat

class KryoInit {
  def customize(kryo: Kryo): Unit  = {
    kryo.addDefaultSerializer(classOf[Mat], classOf[MatKryoSerializer])
    kryo.register(classOf[Mat], 21)
  }
}

de.itd.util.MatKryoSerializer (on system A and B)

package de.itd.util

import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import org.opencv.core.{CvType, Mat}

class MatKryoSerializer extends Serializer[Mat] {
  override def write(kryo: Kryo, output: Output, m: Mat): Unit = {
    val bufferSize: Int = m.rows * m.cols * m.channels
    val arrayByte: Array[Byte] = new Array[Byte](bufferSize)
    m.get(0, 0, arrayByte)

    output.write(arrayByte)
  }

  override def read(kryo: Kryo, input: Input, `type`: Class[Mat]): Mat = {
    val rows = 2160
    val cols = 4096
    val channels = 1
    val bufferSize = rows * cols * channels
    val arrayByte: Array[Byte] = new Array[Byte](bufferSize)

    val frame = new Mat(rows, cols , CvType.CV_8U)
    input.readBytes(arrayByte)
    frame.put(0, 0, arrayByte)

    frame
  }
}

Log of system A (sending a frame to system B)

00:00 TRACE: [kryo] Registration required: true
00:00 TRACE: [kryo] References: false
00:00 TRACE: [kryo] Register class ID 21: org.opencv.core.Mat (de.itd.util.MatKryoSerializer)
[INFO] [01/31/2017 12:31:48.390] [JavaFX Application Thread] [akka.remote.Remoting] Starting remoting
[INFO] [01/31/2017 12:31:48.598] [JavaFX Application Thread] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552]
[INFO] [01/31/2017 12:31:48.602] [JavaFX Application Thread] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]
12:31:48.633 [JavaFX Application Thread] INFO de.itd.controller.MainViewController - MainActorSystem started.
12:31:49.168 [MainActorSystem-akka.actor.default-dispatcher-2] INFO de.itd.actor.local.MainActor - Detector DetectionActor-0 registered.
12:31:54.788 [JavaFX Application Thread] INFO de.itd.controller.MainViewController - detectCars
12:31:56.318 [MainActorSystem-akka.actor.default-dispatcher-4] INFO de.itd.actor.local.MainActor - DetectionActor-0 asked for a frame.
00:08 TRACE: [kryo.FieldSerializerConfig] useAsm: false
00:08 TRACE: [kryo.FieldSerializerConfig] useAsm: false
00:08 TRACE: [kryo] Register class ID 0: int (com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer)
00:08 TRACE: [kryo] Register class ID 1: String (com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer)
00:08 TRACE: [kryo] Register class ID 2: float (com.esotericsoftware.kryo.serializers.DefaultSerializers$FloatSerializer)
00:08 TRACE: [kryo] Register class ID 3: boolean (com.esotericsoftware.kryo.serializers.DefaultSerializers$BooleanSerializer)
00:08 TRACE: [kryo] Register class ID 4: byte (com.esotericsoftware.kryo.serializers.DefaultSerializers$ByteSerializer)
00:08 TRACE: [kryo] Register class ID 5: char (com.esotericsoftware.kryo.serializers.DefaultSerializers$CharSerializer)
00:08 TRACE: [kryo] Register class ID 6: short (com.esotericsoftware.kryo.serializers.DefaultSerializers$ShortSerializer)
00:08 TRACE: [kryo] Register class ID 7: long (com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer)
00:08 TRACE: [kryo] Register class ID 8: double (com.esotericsoftware.kryo.serializers.DefaultSerializers$DoubleSerializer)
00:08 TRACE: [kryo] Register class ID 9: void (com.esotericsoftware.kryo.serializers.DefaultSerializers$VoidSerializer)
00:08 TRACE: [kryo] Register class ID 10: scala.Enumeration$Val (com.romix.scala.serialization.kryo.EnumerationSerializer)
00:08 TRACE: [kryo] Register class ID 11: scala.Enumeration$Value (com.romix.scala.serialization.kryo.EnumerationSerializer)
00:08 TRACE: [kryo] Registration required: true
00:08 TRACE: [kryo] References: false
00:08 TRACE: [kryo] Register class ID 21: org.opencv.core.Mat (de.itd.util.MatKryoSerializer)
00:08 DEBUG: [kryo] Write: Mat [ 2160*4096*CV_8UC1, isCont=true, isSubmat=false, nativeObj=0x7feb070c0bd0, dataAddr=0x145725020 ]
00:08 TRACE: [kryo] Object graph complete.
[WARN] [01/31/2017 12:32:07.645] [MainActorSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://[email protected]:2552/system/remote-watcher] Detected unreachable: [akka.tcp://[email protected]:2553]
[WARN] [01/31/2017 12:32:07.650] [MainActorSystem-akka.remote.default-remote-dispatcher-13] [akka.remote.Remoting] Association to [akka.tcp://[email protected]:2553] having UID [-664475844] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
[INFO] [01/31/2017 12:32:08.288] [MainActorSystem-akka.actor.default-dispatcher-4] [akka://MainActorSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FRemoteActorSystem%4010.150.20.159%3A2553-1] Message [akka.remote.transport.AssociationHandle$InboundPayload] from Actor[akka://MainActorSystem/deadLetters] to Actor[akka://MainActorSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FRemoteActorSystem%4010.150.20.159%3A2553-1#-764637076] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Log of system B (should receive a frame from system A)

00:00 TRACE: [kryo] Registration required: true
00:00 TRACE: [kryo] References: false
00:00 TRACE: [kryo] Register class ID 21: org.opencv.core.Mat (de.itd.util.MatKryoSerializer)
[INFO] [01/31/2017 12:31:29.946] [main] [akka.remote.Remoting] Starting remoting
[INFO] [01/31/2017 12:31:30.253] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2553]
[INFO] [01/31/2017 12:31:30.255] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2553]
12:31:30.272 [main] INFO de.itd.ui.Main$ - RemoteActorSystem started.
12:31:47.050 [RemoteActorSystem-akka.actor.default-dispatcher-2] INFO de.itd.actor.remote.DetectionGroupActor - Receiving initialization message...
12:31:54.308 [RemoteActorSystem-akka.actor.default-dispatcher-5] INFO de.itd.actor.remote.DetectionActor - Frame is available.
[WARN] [01/31/2017 12:32:06.285] [RemoteActorSystem-akka.remote.default-remote-dispatcher-7] [akka.tcp://[email protected]:2553/system/remote-watcher] Detected unreachable: [akka.tcp://[email protected]:2552]
[WARN] [01/31/2017 12:32:06.291] [RemoteActorSystem-akka.remote.default-remote-dispatcher-6] [akka.remote.Remoting] Association to [akka.tcp://[email protected]:2552] having UID [-946314302] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
[INFO] [01/31/2017 12:32:06.365] [RemoteActorSystem-akka.actor.default-dispatcher-2] [akka://RemoteActorSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FMainActorSystem%4010.150.80.177%3A2552-2] Message [akka.remote.transport.AssociationHandle$InboundPayload] from Actor[akka://RemoteActorSystem/deadLetters] to Actor[akka://RemoteActorSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FMainActorSystem%4010.150.80.177%3A2552-2#-1039432132] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[WARN] [01/31/2017 12:32:06.367] [RemoteActorSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://[email protected]:2553/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FMainActorSystem%4010.150.80.177%3A2552-1/endpointWriter] AssociationError [akka.tcp://[email protected]:2553] -> [akka.tcp://[email protected]:2552]: Error [Invalid address: akka.tcp://[email protected]:2552] [
akka.remote.InvalidAssociation: Invalid address: akka.tcp://[email protected]:2552
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system has a UID that has been quarantined. Association aborted.
]
[WARN] [01/31/2017 12:32:06.371] [RemoteActorSystem-akka.remote.default-remote-dispatcher-6] [akka.remote.Remoting] Tried to associate with unreachable remote address [akka.tcp://[email protected]:2552]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has a UID that has been quarantined. Association aborted.]

Upvotes: 0

Views: 482

Answers (1)

Toaditoad
Toaditoad

Reputation: 304

Unfortunately, I cannot reproduce the issue any more. After some trial and error it works just fine... By now, I can only take wild guesses what has changed.

I'm attaching the current code snippets for future reference. Please note that in contrast to the original post, this version serializes objects of case class Frame(videoPos: Double, frame: Mat).

configuration

akka {
  extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]
  actor {
    provider = remote
    warn-about-java-serializer-usage = no
    enable-additional-serialization-bindings = on
    serializers {
      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
    }
    serialization-bindings {
      "de.itd.actor.common.Message$Frame" = kryo
    }
    kryo  {
      kryo-custom-serializer-init = "de.itd.util.KryoInit"
      type = "nograph"
      idstrategy = "explicit"
      buffer-size = 4096
      max-buffer-size = -1
      use-manifests = true
      use-unsafe = false
      post-serialization-transformations = "lz4"
      kryo-trace = true
      resolve-subclasses = false
    }
  }
}

KryoInit

class KryoInit {
  def customize(kryo: Kryo): Unit  = {
    kryo.addDefaultSerializer(classOf[Frame], classOf[FrameKryoSerializer])
    kryo.register(classOf[Frame], 20)
  }
}

KryoSerializer

class FrameKryoSerializer extends Serializer[Frame] {
  override def write(kryo: Kryo, output: Output, frame: Frame): Unit = {
    output.writeDouble(frame.videoPos)
    val m: Mat = frame.frame
    val rows = m.rows()
    val cols = m.cols()
    val channels = m.channels()
    val matType = m.`type`()
    output.writeInt(rows)
    output.writeInt(cols)
    output.writeInt(channels)
    output.writeInt(matType)

    val bufferSize: Int = rows * cols * channels
    val arrayByte: Array[Byte] = new Array[Byte](bufferSize)
    m.get(0, 0, arrayByte)
    output.write(arrayByte)
  }

  override def read(kryo: Kryo, input: Input, `type`: Class[Frame]): Frame = {
    val videoFramePos = input.readDouble()

    val rows = input.readInt()
    val cols = input.readInt()
    val channels = input.readInt()
    val matType = input.readInt()

    val bufferSize = rows * cols * channels
    val arrayByte: Array[Byte] = new Array[Byte](bufferSize)
    val frame = new Mat(rows, cols , matType)
    input.readBytes(arrayByte)
    frame.put(0, 0, arrayByte)

    Frame(videoFramePos, frame)
  }
}

Upvotes: 0

Related Questions