vkumar22
vkumar22

Reputation: 119

How to implement the following code snippet of scala in Java

I am implementing a code to dynamically add multiple columns to a Dataframe with null values in row

I found the following code snippet in scala where the map function of Dataframe object is used.

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, NullType, StructType}
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
import org.apache.spark.sql.functions.lit;

def addColumnsViaMap(df: DataFrame, words: List[String]): DataFrame = {
   val encoder = RowEncoder.apply(getSchema(df, words))
   df.map(mappingRows(df.schema)(words))(encoder)
}

private val mappingRows: StructType => List[String] => Row => Row =
  (schema) => (words) => (row) => {
     val addedCols: List[Any] = words.map(_=> null)
    Row.merge(row, Row.fromSeq(addedCols))
  }

private def getSchema(df: DataFrame, words: List[String]): StructType = {
  var schema: StructType = df.schema
  words.foreach(word => schema = schema.add(word, "string", false))
  schema
}

I have implemented the following two functions in java

 private StructType getSchema(Dataset<Row> df, List<String> cols){
        StructType schema = df.schema();
        cols.forEach(col -> schema.add(col, "int", true));
        return schema;
    }

private addColumnsViaMap(Dataset<Row> df, List<String> cols){
    Encoder<Row> encoder1 = 
  RowEncoder.apply(dataConsolidationEngine.getSchema(df,cols));
   df.map(new MapFunction<Set<String>, Row>() {
                private static final long serialVersionUID = 1L;

                @Override
                public Row call(Set<String> cols) throws Exception {
                    // TODO Auto-generated method stub
                }
            }, encoder1);
}

The addColumnsViaMap method has compilation error connot resolve anonymous map function method due to the parameters mismatch.

and i dont understand the scala code of mappingRows especially the following StructType => List[String] => Row => Row = (schema) => (words) => (row) what this means ??

and how to implement the above scala code in Java ?

Upvotes: 0

Views: 401

Answers (2)

GPI
GPI

Reputation: 9308

Well, this declaration is a bit complex (and IMO a bit unreadable too), so let's step back.

In scala, String, List... are types everyone knows of. You can make a variable of type String.

What you can also do, is assign a function to a variable (this is the functionnal orientation of scala), so functions also have types. Say for example, if you have a function that takes a List and outputs a String, it is of type List => String.

And does that look like in code ?

// A list of strings
val names = List("alice", "bob")
// A function that takes a list and returns a string
def listToString(list: List[String]): String = list.mkString(",")
// We can assign the function to a variable
val myListToString: List[String] => String = listToString

But we have a shorter notation for declaring functions, we may declare them "inline", without using a def statement. So that the above code can be equivalently written :

val names = List("alice", "bob")
val myListToString: List[String] => String = (list) => list.mkString(",")

So, generically speaking :

  • A => B is a type, of a function that takes an A and returns a B
  • (arg: A) => { new B() } is an actual function that takes an instance of A as input (the instance being bound to the variable name arg and whose body returns an instance of B

Now let's do something crazy, let's... start over. Say that F is a function that takes a List and returns a String. What would a function that takes an Int and return a F look like ?

Well it would be :

  • Int => F.
  • That is to say : Int => (List => String)
  • Which can be written Int => List => String

And how do you declare it ?

// Borrowing from above
val names = List("alice", "bob")
val myListToString: List[String] => String = (list) => list.mkString(",")
// now we're doing it
val intToListToString = (integerValue) => myListToString
// now we're doing it in one go
val intToListToString2 = (integerValue) => (list) => list.mkString(",")

Here, intToListToString is a function that takes an int and returns "a function that takes a List and returns a String".

And you can nest again, and again.

Until you get : StructType => List[String] => Row => Row which is a type that means "a function that takes a StructType as input and returns (a function that takes a List[String]as input and returns (a function that takes a Rowas input and returns a row)).

And you could implement it as :

(schema) => // a function that takes schema, and returns
    (words) => // a function that takes a list of words and returns
        (row) => // a function that takes a row and returns
            Row.fromSeq(...) // another row

Now what would that look like in Java ?

If you want to convert it strictly as it is, you may think about it this way : the natural equivalent of scala's A => B is java.util.Function<A, B>. On top of it, if you want to use a function to do a Spark map operation on a Dataframe, you have to use a MapFunction<>.

So we are looking to implement a Function<Schema, Function<List<String>, MapFunction<Row, Row>>> or something of the sort.

Using java lambda notation, you can do it this way :

schema ->  words -> row -> Row.merge(row, Row.fromSeq(Array.newInstance(String.class, words.size)))

Which is a function that takes a schema,

  • that returns a function that takes a list of word

    • that returns a function that takes a Row

      • that returns a row augmented with columns containing null

Maybe my java syntax is correct, maybe not I do not know.

What I do know is that it is a vastly too complex way of achieving your requirements.

What is this requirement : you have a dataframe, you have a list of words, you want to create new columns with this name and containing null.

So what I would have done in scala is this :

import org.apache.spark.sql.DataFrame
def addColumnsViaMap(dataframe: DataFrame, words: List[String]) = words.foldLeft(dataframe)((df, word) => df.withColumn(word, lit(null: String)))

val dataframe = Seq(("a", "b"), ("c", "d")).toDF("columnA", "columnB")
val words = List("columnC", "columnD")
addColumnsViaMap(dataframe, words).show

+-------+-------+-------+-------+
|columnA|columnB|columnC|columnD|
+-------+-------+-------+-------+
|      a|      b|   null|   null|
|      c|      d|   null|   null|
+-------+-------+-------+-------+

Which you can probably write in java as such

DataFrame addColumnsViaMap(DataFrame dataframe, List<String> words) {
    for (String word: words) {
        dataframe = dataframe.withColumn(word, lit((String) null))
    }
    return dataframe;
}

Once again, I do not have a Java based spark environment, but my point is : if you get the principle, rewritting is simple.

Upvotes: 2

Highbrainer
Highbrainer

Reputation: 760

private val mappingRows: StructType => List[String] => Row => Row =
  (schema) => (words) => (row) => {
     val addedCols: List[Any] = words.map(_=> null)
    Row.merge(row, Row.fromSeq(addedCols))
  }

Simply put, that could be read as :

mappingRows is a 'function' that takes 3 parameters (of types StructType, List and Row, say schema, words and row) and that returns a Row. But instead of calling it like that :

mappingRows(schema, words, row)`

you will go

mappingRows(schema)(words)(row)

This means that calling just

mappingRows(schema)(words)

will return a function that take a Row and returns a Row : a mapping function that you can pass to the typical .map() function.

Basically, given a schema and a list of col names, the closure takes a row as input. It simply adds on null column to that row for each given col name.

Does it help you answer your question ?

Upvotes: 0

Related Questions