steve
steve

Reputation: 145

Reading Custom Sequence File in Spark

I have a custom writable class in Hadoop which is saved as sequencefile as follows

   public class ABC implements Writable{
    private byte[] myId;
    private byte[] myType;

    //Constructor and other methods
    @Override
    public void write(DataOutput out) throws IOException {
        myId.write(out);
        myType.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        myId.readFields(in);
        myType.readFields(in);
    }
}

And I would like to use PySpark to read the sequencefile and get the data. I have tried the following three ways:

  1. Directly reading:

sc.sequenceFile("file:///Test.seq", keyClass = "ABC", valueClass ="ABC" )

but get

object not serializable (class: ABC, value: ABC@451de3ec)
  1. Writing convertor:

Following from official tutorial http://spark.apache.org/docs/latest/programming-guide.html#external-datasets, which says

If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to transform that data on the Scala/Java side to something which can be handled by Pyrolite’s pickler. A Converter trait is provided for this. Simply extend this trait and implement your transformation code in the convert method.

Thus I implement a converter as follows:

import test.ABC
import java.io.DataInput
import org.apache.spark.api.python.Converter

/**
 * Implementation of [[org.apache.spark.api.python.Converter]] that converts data
 * to ABC
 */
class DataToABCConverter extends Converter[Any, ABC] {
  override def convert(obj: Any): ABC = {
    if (obj == null) {
      return null
    }
    val in = obj.asInstanceOf[DataInput]
    val abc = new ABC()
    abc.readFields(in)
    abc
  }
}

And in PySpark I use the following code

sc.sequenceFile("file:///Test.seq", keyClass = "ABC", valueClass ="ABC",  keyConverter="DataToABCConverter",  valueConverter="DataToABCConverter" )

But get the following errors

java.lang.ClassCastException: ABC cannot be cast to java.io.DataInput

It seems like the input of converter is my ABC class not java.io.DataInput, so that I can not apply the readFields method to get the data.

  1. Using BytesWritable:

I add an geID() method to get the byets and change the converter as follows:

class DataToChunkConverter extends Converter[Any, BytesWritable] {
  override def convert(obj: Any): BytesWritable = {
    if (obj == null) {
      return null
    }
    val abc = obj.asInstanceOf[ABC]
    val idd = abc.getID()
    new BytesWritable(idd)
  }
}

Than I run the pyspark using

pyspark --master=local[8] --conf "spark.kryo.classesToRegister=org.apache.hadoop.io.BytesWritable" --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"

But get the following errors

Failed to pickle Java object as value: BytesWritable, falling back
to 'toString'. Error: couldn't pickle object of type class org.apache.hadoop.io.BytesWritable

So my question is what is the correct way to read the custom sequencefile in PySpark? An what types is serializable through PySpark? Any suggestions is appreciated!!

Upvotes: 0

Views: 1649

Answers (1)

steve
steve

Reputation: 145

After some experiments(following the third method), it turns out that it works if the native types in scala or Java are used as the return types of converter.

For instance, using the Array[Byte] as return types, the Pyspark can successfully get the data:

 class DataToChunkConverter extends Converter[Any,  Array[Byte]] {
  override def convert(obj: Any):  Array[Byte] = {
    if (obj == null) {
      return null
    }
    val abc = obj.asInstanceOf[ABC] 
    val idd = abc.getID()
    idd
  }
}

Upvotes: 1

Related Questions