Reputation: 5824
What is exact translation of below Scala code snippet in Java?
import org.apache.spark.sql.functions.udf
def upper(s:String) : String = {
s.toUpperCase
}
val toUpper = udf(upper _)
peopleDS.select(peopleDS(“name”), toUpper(peopledS(“name”))).show
Please fill below missing statement in Java:
import org.apache.spark.sql.api.java.UDF1;
UDF1 toUpper = new UDF1<String, String>() {
public String call(final String str) throws Exception {
return str.toUpperCase();
}
};
peopleDS.select(peopleDS.col("name"), /* how to run toUpper("name")) ? */.show();
Register UDF, then call using selectExpr
works for me, but I need something similar to the showen above.
Working example:
sqlContext.udf().register(
"toUpper",
(String s) -> s.toUpperCase(),
DataTypes.StringType
);
peopleDF.selectExpr("toUpper(name)","name").show();
Upvotes: 4
Views: 11353
Reputation: 15297
In Java calling UDF without registration is not possible. Please check the following discussion:
Below is your UDF:
private static UDF1 toUpper = new UDF1<String, String>() {
public String call(final String str) throws Exception {
return str.toUpperCase();
}
};
Register the UDF and you can use callUDF
function.
import static org.apache.spark.sql.functions.callUDF;
import static org.apache.spark.sql.functions.col;
sqlContext.udf().register("toUpper", toUpper, DataTypes.StringType);
peopleDF.select(col("name"),callUDF("toUpper", col("name"))).show();
Upvotes: 7
Reputation: 9
Input csv:
+-------+--------+------+
| name| address|salary|
+-------+--------+------+
| Arun| Indore| 1|
|Shubham| Indore| 2|
| Mukesh|Hariyana| 3|
| Arun| Bhopal| 4|
|Shubham|Jabalpur| 5|
| Mukesh| Rohtak| 6|
+-------+--------+------+
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
SparkSession sparkSession = new SparkSession(new SparkContext(sparkConf));
Dataset<Row> dataset = sparkSession.read().option("header", "true")
.csv("C:\\Users\\Desktop\\Spark\\user.csv");
/**Create udf*/
UDF1<String, String> toLower = new UDF1<String, String>() {
@Override
public String call(String str) throws Exception {
return str.toLowerCase();
}
};
/**Register udf*/
sparkSession.udf().register("toLower", toLower, DataTypes.StringType);
/**call udf using functions.callUDF method*/
dataset.select(dataset.col("name"),dataset.col("salary"),
functions.callUDF("toLower",dataset.col("address")).alias("address")).show();
}
Output :
+-------+------+--------+
| name|salary| address|
+-------+------+--------+
| Arun| 1| indore|
|Shubham| 2| indore|
| Mukesh| 3|hariyana|
| Arun| 4| bhopal|
|Shubham| 5|jabalpur|
| Mukesh| 6| rohtak|
+-------+------+--------+
Upvotes: 1