Neethu Lalitha
Neethu Lalitha

Reputation: 3071

Spark dataset : Casting Columns of dataset

This is my dataset :

  Dataset<Row> myResult = pot.select(col("number")
                    , col("document")
                    , explode(col("mask")).as("mask"));

I need to now create a new dataset from the existing myResult . something like below:

  Dataset<Row> myResultNew = myResult.select(col("number")
                , col("name")
                , col("age")
                , col("class")
                , col("mask");

name , age and class are created from column document from Dataset myResult . I guess I can call functions on the column document and then perform any operation on that.

myResult.select(extract(col("document")));


 private String extract(final Column document) {
        //TODO ADD A NEW COLUMN nam, age, class TO THE NEW DATASET.
        // PARSE DOCUMENT AND GET THEM.

     XMLParser doc= (XMLParser) document // this doesnt work???????




} 

My question is: document is of type column and I need to convert it into a different Object Type and parse it for extracting name , age ,class. How can I do that. document is an xml and i need to do parsing for getting the other 3 columns so cant avoid converting it to XML .

Upvotes: 1

Views: 234

Answers (1)

werner
werner

Reputation: 14845

Converting the extract method into an UDF would be a solution that is as close as possible to what you are asking. An UDF can take the value of one or more columns and execute any logic with this input.

import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;

[...]

UserDefinedFunction extract = udf(
        (String document) -> {
            List<String> result = new ArrayList<>();
            XMLParser doc = XMLParser.parse(document);
            String name = ... //read name from xml document
            String age = ... //read age from xml document
            String clazz = ... //read class from xml document
            result.add(name);
            result.add(age);
            result.add(clazz);
            return result;
         }, DataTypes.createArrayType(DataTypes.StringType)
);

A restriction of UDFs is that they can only return one column. Therefore the function returns a String array that has to be unpacked afterwards.

Dataset<Row> myResultNew = myResult
    .withColumn("extract", extract.apply(col("document"))) //1
    .withColumn("name", col("extract").getItem(0))         //2
    .withColumn("age", col("extract").getItem(1))          //2
    .withColumn("class", col("extract").getItem(2))        //2
    .drop("document", "extract");                          //3
  1. call the UDF and use the column that contains the xml document as parameter of the apply function
  2. create the result columns out of the returned array from step 1
  3. drop the intermediate columns

Note: the udf is executed once per row in the dataset. If the creation of the xml parser is expensive this might slow down the execution of the Spark job as one parser is instantiated per row. Due to the parallel nature of Spark it is not possible to reuse the parser for the next row. If this is an issue, another (at least in the Java world slightly more complex) option would be to use mapPartitions. Here one would not need one parser per row but only one parser per partition of the dataset.

A completely different approach would be to use spark-xml.

Upvotes: 1

Related Questions