Reputation: 145
I have a custom writable class in Hadoop which is saved as sequencefile as follows
public class ABC implements Writable{
private byte[] myId;
private byte[] myType;
//Constructor and other methods
@Override
public void write(DataOutput out) throws IOException {
myId.write(out);
myType.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
myId.readFields(in);
myType.readFields(in);
}
}
And I would like to use PySpark to read the sequencefile and get the data. I have tried the following three ways:
sc.sequenceFile("file:///Test.seq", keyClass = "ABC", valueClass ="ABC" )
but get
object not serializable (class: ABC, value: ABC@451de3ec)
Following from official tutorial http://spark.apache.org/docs/latest/programming-guide.html#external-datasets, which says
If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to transform that data on the Scala/Java side to something which can be handled by Pyrolite’s pickler. A Converter trait is provided for this. Simply extend this trait and implement your transformation code in the convert method.
Thus I implement a converter as follows:
import test.ABC
import java.io.DataInput
import org.apache.spark.api.python.Converter
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts data
* to ABC
*/
class DataToABCConverter extends Converter[Any, ABC] {
override def convert(obj: Any): ABC = {
if (obj == null) {
return null
}
val in = obj.asInstanceOf[DataInput]
val abc = new ABC()
abc.readFields(in)
abc
}
}
And in PySpark I use the following code
sc.sequenceFile("file:///Test.seq", keyClass = "ABC", valueClass ="ABC", keyConverter="DataToABCConverter", valueConverter="DataToABCConverter" )
But get the following errors
java.lang.ClassCastException: ABC cannot be cast to java.io.DataInput
It seems like the input of converter is my ABC class not java.io.DataInput, so that I can not apply the readFields method to get the data.
I add an geID()
method to get the byets and change the converter as follows:
class DataToChunkConverter extends Converter[Any, BytesWritable] {
override def convert(obj: Any): BytesWritable = {
if (obj == null) {
return null
}
val abc = obj.asInstanceOf[ABC]
val idd = abc.getID()
new BytesWritable(idd)
}
}
Than I run the pyspark using
pyspark --master=local[8] --conf "spark.kryo.classesToRegister=org.apache.hadoop.io.BytesWritable" --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
But get the following errors
Failed to pickle Java object as value: BytesWritable, falling back
to 'toString'. Error: couldn't pickle object of type class org.apache.hadoop.io.BytesWritable
So my question is what is the correct way to read the custom sequencefile in PySpark? An what types is serializable through PySpark? Any suggestions is appreciated!!
Upvotes: 0
Views: 1649
Reputation: 145
After some experiments(following the third method), it turns out that it works if the native types in scala or Java are used as the return types of converter.
For instance, using the Array[Byte]
as return types, the Pyspark can successfully get the data:
class DataToChunkConverter extends Converter[Any, Array[Byte]] {
override def convert(obj: Any): Array[Byte] = {
if (obj == null) {
return null
}
val abc = obj.asInstanceOf[ABC]
val idd = abc.getID()
idd
}
}
Upvotes: 1