Juh_
Juh_

Reputation: 15539

How to unit-test a class is serializable for spark?

I just found a bug on a class serialization in spark.

=> Now, I want to make a unit-test, but I don't see how?

Notes:

Upvotes: 2

Views: 1971

Answers (1)

Juh_
Juh_

Reputation: 15539

Looking into spark broadcast code, I found a way. But it uses private spark code, so it might becomes invalid if spark changes internally. But still it works.

Add a test class in a package starting by org.apache.spark, such as:

package org.apache.spark.my_company_tests

// [imports]

/**
 * test data that need to be broadcast in spark (using kryo)
 */
class BroadcastSerializationTests extends FlatSpec with Matchers {

  it should "serialize a transient val, which should be lazy" in {

    val data = new MyClass(42) // data to test
    val conf = new SparkConf()


    // Serialization
    //   code found in TorrentBroadcast.(un)blockifyObject that is used by TorrentBroadcastFactory
    val blockSize = 4 * 1024 * 1024 // 4Mb
    val out = new ChunkedByteBufferOutputStream(blockSize, ByteBuffer.allocate)
    val ser = new KryoSerializer(conf).newInstance() // Here I test using KryoSerializer, you can use JavaSerializer too
    val serOut = ser.serializeStream(out)

    Utils.tryWithSafeFinally { serOut.writeObject(data) } { serOut.close() }

    // Deserialization
    val blocks = out.toChunkedByteBuffer.getChunks()
    val in = new SequenceInputStream(blocks.iterator.map(new ByteBufferInputStream(_)).asJavaEnumeration)
    val serIn = ser.deserializeStream(in)

    val data2 = Utils.tryWithSafeFinally { serIn.readObject[MyClass]() } { serIn.close() }

    // run test on data2
    data2.yo shouldBe data.yo
  }
}

class MyClass(i: Int) extends Serializable {
  @transient val yo = 1 to i // add lazy to make the test pass: not lazy transient val are not recomputed after deserialization
}

Upvotes: 1

Related Questions