AlexF
AlexF

Reputation: 21

Spark CompileException in dataset.groupByKey()

I'm trying to use Spark to extract zip code prefixes and the Spark generated code is failing to compile due to an attempt to initialize java.lang.Double with org.apache.spark.unsafe.types.UTF8String as an argument. It's not exactly clear to me if this is an issue with Spark or how I'm using it. I'm using Java 1.8 and spark-mllib_2.10 in local mode. The failing code:

public static void read(Dataset<ZipCodeLatLon> zipCodes) {
    zipCodes.groupByKey(new MapFunction<ZipCodeLatLon, String>() {
        @Override
        public String call(ZipCodeLatLon value) throws Exception {
            return value.getZip().substring(0, 3);
        }
    }, Encoders.STRING()).keys().show();
}

Results in

Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 58: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "java.lang.Double(double)", "java.lang.Double(java.lang.String)"

The generated code is quite long so I won't put the entire thing in here, but the key section that is causing it to fail is:

private UTF8String argValue;
final alex.floyd.lc.geocode.ZipCodeLatLon value1 = false ? null : new alex.floyd.lc.geocode.ZipCodeLatLon();
...
public java.lang.Object apply(java.lang.Object _i) {
...
    resultIsNull = false;
    if (!resultIsNull) {
        boolean isNull3 = i.isNullAt(1);
        UTF8String value3 = isNull3 ? null : (i.getUTF8String(1));
        resultIsNull = isNull3;
        argValue = value3;
    }

    final java.lang.Double value2 = resultIsNull ? null : new java.lang.Double(argValue);
    javaBean.setLat(value2);
...
}

The error seems independent of the return type of the groupByKey function (I tried Integer and a Java bean instead of String). However, if I change the input Dataset type to something else like String instead of ZipCodeLatLon, this code works. However, from what I can tell ZipCodeLatLon seems to follow all required Java bean conventions, so I'm not sure what I would need to do to change it. I also used Spark to read in ZipCodeLatLon from a CSV, so Spark can handle the class, just not in the context of a groupByKey method.

public class ZipCodeLatLon implements Serializable{
private String zip;
private Double lat;
private Double lng;
public String getZip() {
    return zip;
}
public void setZip(String zip) {
    this.zip = zip;
}
public Double getLat() {
    return lat;
}
public void setLat(Double lat) {
    this.lat = lat;
}
public Double getLng() {
    return lng;
}
public void setLng(Double lng) {
    this.lng = lng;
}
}

Some additional information: This seems to be related to how ZipCodeLatLon is read in from a CSV. When manually creating a Dataset the code works fine.

Totally fine:

ZipCodeLatLon l = new ZipCodeLatLon();
l.setZip("12345");
l.setLat(0.0);
l.setLng(0.0);
read(spark.createDataset(Lists.newArrayList(l, l), Encoders.bean(ZipCodeLatLon.class)));

Totally broken:

Dataset<ZipCodeLatLon> dataset = spark.read()
    .option("header", true)
    .csv(zipCodeData.getAbsolutePath())
    .as(Encoders.bean(ZipCodeLatLon.class));
dataset.show(); // works - reading in the CSV succeeds
read(dataset); // fails on groupByKey

Upvotes: 1

Views: 817

Answers (1)

AlexF
AlexF

Reputation: 21

Figured it out. You need to create a schema for the csv reader. I assumed the encoder would provide the schema, but it appears not. Wish the error message was more useful!

Before:

public static Dataset<ZipCodeLatLon> read(SparkSession spark) {
    return spark.read()
            .option("header", true)
            .csv(ZIP_CODE_DATA.getAbsolutePath())
            .as(Encoders.bean(ZipCodeLatLon.class));
}

After:

public static Dataset<ZipCodeLatLon> read(SparkSession spark) {
    return spark.read()
            .option("header", true)
            .option("inferSchema", "true")
            .csv(ZIP_CODE_DATA.getAbsolutePath())
            .as(Encoders.bean(ZipCodeLatLon.class));
}

Upvotes: 1

Related Questions