ashwin gupta
ashwin gupta

Reputation: 11

Cant create Priority Queue Serializer

Trying to create PQ kryo serialiser in scala 2.12 but its failing with InstantiationException, and cant seem to bypass the error. Also, tried to create object for instantiation, but still faced the same issue

    Caused by: java.lang.InstantiationException: cPriorityQueueSerializer
    at java.base/java.lang.Class.newInstance(Class.java:671)
    at      com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:43)
    ... 63 more
     Caused by: java.lang.NoSuchMethodException: PriorityQueueSerializer.<init>()
    at java.base/java.lang.Class.getConstructor0(Class.java:3617)
    at java.base/java.lang.Class.newInstance(Class.java:658)
    ... 64 more

Here is the define Serializer

class PriorityQueueSerializer[T](implicit ord: Ordering[T]) extends Serializer[mutable.PriorityQueue[T]](true, false) {

  override def write(kryo: Kryo, output: Output, pq: mutable.PriorityQueue[T]): Unit = {
    println("Write is running")
    output.writeInt(pq.size)
    pq.foreach { elem =>
      kryo.writeClassAndObject(output, elem)
    }
    kryo.writeClassAndObject(output, ord)
  }

  override def read(kryo: Kryo, input: Input, clazz: Class[mutable.PriorityQueue[T]]): mutable.PriorityQueue[T] = {
    println("Read is running")
    val pq = new mutable.PriorityQueue[T]()(ord)
    kryo.reference(pq)
    input.readStringBuilder()
    val size = input.readInt()
    for (_ <- 0 until size) {
      val elem = kryo.readClassAndObject(input).asInstanceOf[T]
      pq.enqueue(elem)
    }
    val ordering = kryo.readClassAndObject(input).asInstanceOf[Ordering[T]]
    new mutable.PriorityQueue()(ordering) ++= pq
  }
}

Trying to do so because Facing NPE when upgrading scala from 2.11 to 2.12 in flink

Upvotes: 1

Views: 107

Answers (1)

Didier Dupont
Didier Dupont

Reputation: 29538

Not a lot of context on what your doing, but your exception is the Kryo library trying and failing to create an instance of your serializer, calling a non existent no arg constructor (the constructor has one parameter, ord):

class PriorityQueueSerializer[T](implicit ord: Ordering[T]) 

That ord is an implicit argument is irrelevant. It is still an argument.

I don't know Kryo and whether you can control how it create the serializer. Fortunately, you do need to. You don't need and shouldn't use the constructor argument. You serialize an ordering, as you should, but you cannot count of every queue to use the same one: do not serialize the one from the implicit context (serializer constructor) but rather the one from that your actual priority queue uses. I think that if you do

kryo.writeClassAndObject(output, pq.ord)

(that's pq.ord rather than ord) and get rid of the constructor argument, that should get you past the current error.


Edit: a bit more than that actually, as I see that you use ord at deserialisation too:

At serialisation, serialize the ordering before the content:

kryo.writeClassAndObject(output, pq.ord)
output.writeInt(pq.size)
pq.foreach { elem =>

At deserialisation, first read the ordering, then create the queue, then fill it

val ordering = kryo.readClassAndObject(input).asInstanceOf[Ordering[T]]
val pq = new mutable.PriorityQueue[T]()(ordering)
-- fill and return pq

Upvotes: 0

Related Questions