moe
moe

Reputation: 1806

Spark mapping a Dataframe to another Dataframe

I am trying to parse a column of a dataframe into two different columns (with the aim to add these columns to the dataframe afterwards). I tried two approaches. Both lead to problems. Can anyone show me how to achieve this with one of the methods or hint me to a different approach?

Map Dataset<Row> to Dataset<Tuple2<String, String>>

Dataset<Tuple2<String, String>> dfParsed =  df.map(new MapFunction<Row, Tuple2<String, String>>
            () {

        @Override
        public Tuple2<String, String> call(Row value) throws Exception {
            // Parse the column
            String opsCode = value.getAs("OPSCODE");
            String[] splitted = opsCode.split("[\\.|\\-]");

            return new Tuple2<>(splitted[1], splitted[2]);

        }
    }, Encoders.tuple(Encoders.STRING(), Encoders.STRING()));

This works. But in dfParsed both columns are named value and I can not select them to add them to the original dataframe.

Map Dataset<Row> to Dataset<Row>

Dataset<Row> dfParsed =  df.map(new MapFunction<Row, Row>() {

        @Override
        public Row> call(Row value) throws Exception {

            // Parse the column
            String opsCode =   value.getAs("OPSCODE");
            String[] splitted = opsCode.split("[\\.|\\-]");

            //return RowFactory.create(splitted[1], splitted[2]);
       }
    }, ???);

This approach does not work because I don't know which Encoder to choose.

Upvotes: 0

Views: 2579

Answers (2)

moe
moe

Reputation: 1806

It works even easier with an selectExpr statement, because it already appends the new columns to the dataframe. (And hereby adding them with a join statement or similar can be avoided)

Dataset<Row> dfParsed = df.selectExpr("*",
    "split(OPSCODE, '[\\.|\\-]')[1] as OPSCODE_CAT",
    "split(OPSCODE, '[\\.|\\-]')[2] as OPSCODE_PROC ");

The SQL split command is used to split the column "OPSCODE" with an regular expression. The second part of the splitted string is stored in a new column "OPSCODE_CAT", the third in "OPSCODE_PROC".

Upvotes: 0

Justin Pihony
Justin Pihony

Reputation: 67065

You can just call toDF(columnNames) after your first attempt to rename the columns.

output.toDF("col1", "col2", ...)

Upvotes: 2

Related Questions