Reputation: 375
I'm trying to use the Dataframe map function on an arbitrary dataset. However I don't understand how you would map from Row-> Row. No examples are given for arbitrary data in the spark sql documentation:
Dataset<Row> original_data = ...
Dataset<Row> changed_data = original_data.map(new MapFunction<Row,Row>{
@Override
public Row call(Row row) throws Exception {
Row newRow = RowFactory.create(obj1,obj2);
return newRow;
}
}, Encoders.bean(Row.class));
However this does not work since there needs to be some sort of Encoder? How can I map to a generic Row?
Upvotes: 2
Views: 5069
Reputation: 5834
If obj1 and and obj2 are not primitive type then represent their schema to StructType to create Row encoder. I would suggest instead of using Row type, create custom bean which stores both obj1 and obj2 then use that custom bean encoder in map
transformation.
Row type:
StructType customStructType = new StructType();
customStructType = customStructType.add("obj1", DataTypes.< type>, false);
customStructType = customStructType.add("obj2", DataTypes.< type >, false);
ExpressionEncoder<Row> customTypeEncoder = null;
Dataset<Row> changed_data = original_data.map(row->{
return RowFactory.create(obj1,obj2);;
}, RowEncoder.apply(customStructType));
Custom Bean type:
class CustomBean implements ....{
Object obj1;
Object obj2;
....
}
Dataset<CustomBean> changed_data = original_data.map(row->{
return new CustomBean(obj1,obj2);
}, Encoders.bean(CustomBean));
Upvotes: 2