Reputation: 119
I am implementing a code to dynamically add multiple columns to a Dataframe with null values in row
I found the following code snippet in scala where the map function of Dataframe object is used.
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, NullType, StructType}
import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession}
import org.apache.spark.sql.functions.lit;
def addColumnsViaMap(df: DataFrame, words: List[String]): DataFrame = {
val encoder = RowEncoder.apply(getSchema(df, words))
df.map(mappingRows(df.schema)(words))(encoder)
}
private val mappingRows: StructType => List[String] => Row => Row =
(schema) => (words) => (row) => {
val addedCols: List[Any] = words.map(_=> null)
Row.merge(row, Row.fromSeq(addedCols))
}
private def getSchema(df: DataFrame, words: List[String]): StructType = {
var schema: StructType = df.schema
words.foreach(word => schema = schema.add(word, "string", false))
schema
}
I have implemented the following two functions in java
private StructType getSchema(Dataset<Row> df, List<String> cols){
StructType schema = df.schema();
cols.forEach(col -> schema.add(col, "int", true));
return schema;
}
private addColumnsViaMap(Dataset<Row> df, List<String> cols){
Encoder<Row> encoder1 =
RowEncoder.apply(dataConsolidationEngine.getSchema(df,cols));
df.map(new MapFunction<Set<String>, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(Set<String> cols) throws Exception {
// TODO Auto-generated method stub
}
}, encoder1);
}
The addColumnsViaMap method has compilation error connot resolve anonymous map function method due to the parameters mismatch.
and i dont understand the scala code of mappingRows especially the following StructType => List[String] => Row => Row =
(schema) => (words) => (row)
what this means ??
and how to implement the above scala code in Java ?
Upvotes: 0
Views: 401
Reputation: 9308
Well, this declaration is a bit complex (and IMO a bit unreadable too), so let's step back.
In scala, String
, List
... are types everyone knows of. You can make a variable of type String
.
What you can also do, is assign a function to a variable (this is the functionnal orientation of scala), so functions also have types. Say for example, if you have a function that takes a List
and outputs a String
, it is of type List => String
.
And does that look like in code ?
// A list of strings
val names = List("alice", "bob")
// A function that takes a list and returns a string
def listToString(list: List[String]): String = list.mkString(",")
// We can assign the function to a variable
val myListToString: List[String] => String = listToString
But we have a shorter notation for declaring functions, we may declare them "inline", without using a def
statement. So that the above code can be equivalently written :
val names = List("alice", "bob")
val myListToString: List[String] => String = (list) => list.mkString(",")
So, generically speaking :
A => B
is a type, of a function that takes an A
and returns a B
(arg: A) => { new B() }
is an actual function that takes an instance of A
as input (the instance being bound to the variable name arg
and whose body returns an instance of BNow let's do something crazy, let's... start over. Say that F
is a function that takes a List
and returns a String
. What would a function that takes an Int
and return a F
look like ?
Well it would be :
Int => F
.Int => (List => String)
Int => List => String
And how do you declare it ?
// Borrowing from above
val names = List("alice", "bob")
val myListToString: List[String] => String = (list) => list.mkString(",")
// now we're doing it
val intToListToString = (integerValue) => myListToString
// now we're doing it in one go
val intToListToString2 = (integerValue) => (list) => list.mkString(",")
Here, intToListToString
is a function that takes an int
and returns "a function that takes a List
and returns a String
".
And you can nest again, and again.
Until you get : StructType => List[String] => Row => Row
which is a type that means "a function that takes a StructType
as input and returns (a function that takes a List[String]
as input and returns (a function that takes a Row
as input and returns a row)).
And you could implement it as :
(schema) => // a function that takes schema, and returns
(words) => // a function that takes a list of words and returns
(row) => // a function that takes a row and returns
Row.fromSeq(...) // another row
Now what would that look like in Java ?
If you want to convert it strictly as it is, you may think about it this way : the natural equivalent of scala's A => B
is java.util.Function<A, B>
. On top of it, if you want to use a function to do a Spark map
operation on a Dataframe
, you have to use a MapFunction<>
.
So we are looking to implement a Function<Schema, Function<List<String>, MapFunction<Row, Row>>>
or something of the sort.
Using java lambda notation, you can do it this way :
schema -> words -> row -> Row.merge(row, Row.fromSeq(Array.newInstance(String.class, words.size)))
Which is a function that takes a schema,
that returns a function that takes a list of word
that returns a function that takes a Row
Maybe my java syntax is correct, maybe not I do not know.
What I do know is that it is a vastly too complex way of achieving your requirements.
What is this requirement : you have a dataframe, you have a list of words, you want to create new columns with this name and containing null.
So what I would have done in scala is this :
import org.apache.spark.sql.DataFrame
def addColumnsViaMap(dataframe: DataFrame, words: List[String]) = words.foldLeft(dataframe)((df, word) => df.withColumn(word, lit(null: String)))
val dataframe = Seq(("a", "b"), ("c", "d")).toDF("columnA", "columnB")
val words = List("columnC", "columnD")
addColumnsViaMap(dataframe, words).show
+-------+-------+-------+-------+
|columnA|columnB|columnC|columnD|
+-------+-------+-------+-------+
| a| b| null| null|
| c| d| null| null|
+-------+-------+-------+-------+
Which you can probably write in java as such
DataFrame addColumnsViaMap(DataFrame dataframe, List<String> words) {
for (String word: words) {
dataframe = dataframe.withColumn(word, lit((String) null))
}
return dataframe;
}
Once again, I do not have a Java based spark environment, but my point is : if you get the principle, rewritting is simple.
Upvotes: 2
Reputation: 760
private val mappingRows: StructType => List[String] => Row => Row =
(schema) => (words) => (row) => {
val addedCols: List[Any] = words.map(_=> null)
Row.merge(row, Row.fromSeq(addedCols))
}
Simply put, that could be read as :
mappingRows
is a 'function' that takes 3 parameters (of types StructType
, List
and Row
, say schema, words and row) and that returns a Row
. But instead of calling it like that :
mappingRows(schema, words, row)`
you will go
mappingRows(schema)(words)(row)
This means that calling just
mappingRows(schema)(words)
will return a function that take a Row
and returns a Row
: a mapping function that you can pass to the typical .map()
function.
Basically, given a schema and a list of col names, the closure takes a row as input. It simply adds on null column to that row for each given col name.
Does it help you answer your question ?
Upvotes: 0