Bitswazsky
Bitswazsky

Reputation: 4698

How to resolve NoSuchMethodException in java when calling custom spark UDF

I've a java spark streaming app (using spark 3.0.1) where I need to call custom transformations on streaming data. These custom transformations are defined inside a class as methods and given to us as a jar file. We need to wrap these methods in UDFs and call those in our spark code. A sample set of transformations can be defined as follows. Please remember it comes as a jar.

import java.io.Serializable;

public class CustomTransformations implements Serializable {
    public String f1(String input) {
        return input + "_1";
    }

    public String f2(String input) {
        return input + "_2";
    }

    public String f3(String input) {
        return input + "_3";
    }
}

Let's assume somewhere (e.g. json or config file) we have a map of the transformations and the corresponding method names (Strings), so that given the transformation, we can wrap the corresponding method in a UDF and invoke it. I created the following class for this purpose.

import java.lang.reflect.Method;

import static org.apache.spark.sql.functions.udf;

public class Creator {
    public static UserDefinedFunction getUDF(CustomTransformations ct, String funcName)
            throws NoSuchMethodException {
        Method method = ct.getClass().getDeclaredMethod(funcName);
        return udf(
                (UDF1<String, Object>) method::invoke, DataTypes.StringType);
    }
}

So far no compilation error. But now the issue is, if I call this method getUDF from the spark code, it shows a NoSuchMethodException. E.g. my spark code is something like following.

public class SampleSparkJob {
    public static void main(String[] args) {
        SparkSession.Builder sparkSessionBuilder = SparkSession.builder()
                .master("local[2]")
                .appName("sample-streaming");

        CustomTransformations ct = new CustomTransformations();
        try (SparkSession spark = sparkSessionBuilder.getOrCreate()) {
            Dataset<Row> df1 = MyKafkaConnectors.readFromKafka();

            // this is where I get the exceptions
            Dataset<Row> df2 = df1
                    .withColumn("value", Creator.getUDF(ct, "f1").apply(col("value")))
                    .withColumn("value", Creator.getUDF(ct, "f2").apply(col("value")))
                    .withColumn("value", Creator.getUDF(ct, "f3").apply(col("value")));

            StreamingQuery query = MyKafkaConnectors.WriteToKafka(df2);
            query.awaitTermination();
        } catch (TimeoutException | StreamingQueryException | NoSuchMethodException e) {
            e.printStackTrace();
        }
    }
}

This is the error that I get:

java.lang.NoSuchMethodException: <pkgname>.CustomTransformations.f1()
    at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)
    at Creator.getUDF(Creator.java:14)
    at SampleSparkJob.main(SampleSparkJob.java:29)

The package-name is correct. Clearly, the client's CustomTransformations class has a method f1. So I'm not able to understand why it's showing this error. Any help is appreciated.

Upvotes: 0

Views: 514

Answers (2)

Bitswazsky
Bitswazsky

Reputation: 4698

I've only one point to add to the precise answer given by @fonkap. As the java.lang.reflect.Method is not serializable, we need to bypass referencing that object in our getUDF method. The Creator class will be something like the following.

import static org.apache.spark.sql.functions.udf;

public class Creator implements Serializable {
    public static UserDefinedFunction getUDF(CustomTransformation ct, String funcName) {
        return udf((UDF1<String, Object>) (s -> ct.getClass().getDeclaredMethod(funcName,
                String.class).invoke(ct, s)),
                DataTypes.StringType);
    }            
}

Upvotes: 1

fonkap
fonkap

Reputation: 2509

Your code has two problems, both unrelated to spark.

First, NoSuchMethodException:CustomTransformations.f1() is telling you that no method f1 without any parameters does exist. This is true.

You need to specify parameter types to getDeclaredMethod (String in this case).

Second, a method cannot be called on his own with invoke, you need to pass the "owner" or "this" object to invoke.

Then Creator fixed looks like this:

public class Creator {
    public static UserDefinedFunction getUDF(CustomTransformations ct, String funcName)
            throws NoSuchMethodException {
        Method method = ct.getClass().getDeclaredMethod(funcName, String.class);
        return udf(
                (UDF1<String, Object>) (s -> method.invoke(ct, s)), DataTypes.StringType);
    }
}

And it will work just fine.

Upvotes: 1

Related Questions