Atihska
Atihska

Reputation: 5126

Unable to understand UDFs in Spark and especially in Java

I am trying to create a new column in Spark Datasets based on another column's value. The other column's value is searched in a json file as a key and the returned its value which is the value to be used for new column.

Here is then code that I tried but it doesn't work and I am not sure how UDF's work as well. How do you add a column in this case using withColumn or udf?

Dataset<Row> df = spark.read().format("csv").option("header", "true").load("file path");
        Object obj = new JSONParser().parse(new FileReader("json path"));
        JSONObject jo = (JSONObject) obj;

        df = df.withColumn("cluster", functions.lit(jo.get(df.col("existing col_name")))));

Any help will be appreciated. Thanks in advance!

Upvotes: 0

Views: 441

Answers (2)

Atihska
Atihska

Reputation: 5126

Thanks @Constantine. I was able to better understand UDFs from your example. Here is my java code:

        Object obj = new JSONParser().parse(new FileReader("json path"));
        JSONObject jo = (JSONObject) obj;

        spark.udf().register("getJsonVal", new UDF1<String, String>() {
            @Override
            public String call(String key) {
                return  (String) jo.get(key.substring(0, 5));
            }
        }, DataTypes.StringType);

        df = df.withColumn("cluster", functions.callUDF("getJsonVal", df.col("existing col_name")));
        df.show(); // SHOWS NEW CLUSTER COLUMN

Upvotes: 0

Constantine
Constantine

Reputation: 1416

Spark allows you to create custom User Defined Functions(UDFs) using udf function.

Following is a scala snippet of how to define a UDF.

val obj = new JSONParser().parse(new FileReader("json path"));
val jo = obj.asInstanceOf[JSONObject];

def getJSONObject(key: String) = {
   jo.get(key)
}

Once you have defined your function, you can convert it to a UDF as:

 val getObject = udf(getJSONObject _)

There are two approaches for using UDF.

  1. df.withColumn("cluster", lit(getObject(col("existing_col_name"))))

  2. If you are using spark sql, you have to register your udf in sqlContext before you use it.

    spark.sqlContext.udf.register("get_object", getJSONObject _)

    And then you can use it as

    spark.sql("select get_object(existing_column) from some_table")

Out of these, which to use is completely subjective.

Upvotes: 2

Related Questions