Light
Light

Reputation: 375

Using map on dataset with arbitrary rows in Spark SQL

I'm trying to use the Dataframe map function on an arbitrary dataset. However I don't understand how you would map from Row-> Row. No examples are given for arbitrary data in the spark sql documentation:

Dataset<Row> original_data = ...
Dataset<Row> changed_data = original_data.map(new MapFunction<Row,Row>{
            @Override
            public Row call(Row row) throws Exception {
                Row newRow = RowFactory.create(obj1,obj2);
                return newRow;
            }
}, Encoders.bean(Row.class));

However this does not work since there needs to be some sort of Encoder? How can I map to a generic Row?

Upvotes: 2

Views: 5069

Answers (1)

Rahul Sharma
Rahul Sharma

Reputation: 5834

If obj1 and and obj2 are not primitive type then represent their schema to StructType to create Row encoder. I would suggest instead of using Row type, create custom bean which stores both obj1 and obj2 then use that custom bean encoder in map transformation.

Row type:

StructType customStructType = new StructType();
        customStructType = customStructType.add("obj1", DataTypes.< type>, false);
        customStructType = customStructType.add("obj2", DataTypes.< type >, false);
        ExpressionEncoder<Row> customTypeEncoder = null;

        Dataset<Row> changed_data = original_data.map(row->{
            return RowFactory.create(obj1,obj2);;
    }, RowEncoder.apply(customStructType));

Custom Bean type:

class CustomBean implements ....{
    Object obj1;
    Object obj2;
....
}

Dataset<CustomBean> changed_data = original_data.map(row->{
                return new CustomBean(obj1,obj2);
        }, Encoders.bean(CustomBean));

Upvotes: 2

Related Questions