jigsaw
jigsaw

Reputation: 165

Converting typed JavaRDD to Row JavaRDD

I am trying to convert a typed rdd to row rdd and then creating the dataframe from it. It throws exception when I execute code.

code:

JavaRDD<Counter> rdd = sc.parallelize(counters);
JavaRDD<Row> rowRDD = rdd.map((Function<Counter, Row>) RowFactory::create);

//I am using some schema here based on the class Counter
DataFrame df = sqlContext.createDataFrame(rowRDD, getSchema());
marineDF.show(); //throws Exception 

Does conversion from typed rdd to row rdd preserve the order in the row factory? If not how do I make sure of that?

Class code :

class Counter {
  long vid;
  byet[] bytes; 
  List<B> blist;
}
class B {
  String id;
  long count;
}

schema:

private StructType getSchema() {
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("vid", DataTypes.LongType, false));
fields.add(DataTypes.createStructField("bytes",DataTypes.createArrayType(DataTypes.ByteType), false));

List<StructField> bFields = new ArrayList<>();
bFields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
bFields.add(DataTypes.createStructField("count", DataTypes.LongType, false));

StructType bclasSchema = DataTypes.createStructType(bFields);

fields.add(DataTypes.createStructField("blist", DataTypes.createArrayType(bclasSchema, false), false));
StructType schema = DataTypes.createStructType(fields);
return schema;
}

fails with exception :

java.lang.ClassCastException: test.spark.SampleTest$A cannot be cast to java.lang.Long

    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:42)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:221)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$LongConverter$.toScalaImpl(CatalystTypeConverters.scala:367)

Upvotes: 2

Views: 5802

Answers (1)

zero323
zero323

Reputation: 330423

The thing is there is no conversion here. When you create a Row it can accept an arbitrary Object. It is placed as is. So it is not equivalent to a DataFrame creation:

spark.createDataFrame(rdd, Counter.class); 

or a Dataset<Counter> creation:

Encoder<Counter> encoder = Encoders.bean(Counter.class);
spark.createDataset(rdd, encoder);

when working with bean classes.

So RowFactory::create is just not applicable here. If you want to pass RDD<Row> all values should be already represented in a form that can be directly used with DataFrame with required type mapping. It means you have to explicitly map each Counter to Row of the following shape:

Row(vid, bytes, List(Row(id1, count1), ..., Row(idN, countN))

and your code should be equivalent to:

JavaRDD<Row> rows = counters.map((Function<Counter, Row>) cnt -> {
  return RowFactory.create(
    cnt.vid, cnt.bytes,
    cnt.blist.stream().map(b -> RowFactory.create(b.id, b.count)).toArray()
  );
});

Dataset<Row> df = sqlContext.createDataFrame(rows, getSchema());

Upvotes: 5

Related Questions