SaleemKhair
SaleemKhair

Reputation: 541

Need help migrating from Spark 2.0 to Spark 3.1 - Accumulable to AccumulatorV2

I'm working on adding Spark 3.1 and Scala 2.12 support for Kylo Data-Lake Management Platform.

I need help with migrating the following functions:

    /**
     * Creates an {@link Accumulable} shared variable with a name for display in the Spark UI.
     */
    @Nonnull
    static <R, P1> Accumulable<R, P1> accumulable(@Nonnull final R initialValue, @Nonnull final String name, @Nonnull final AccumulableParam<R, P1> param,
                                                  @Nonnull final KyloCatalogClient<Dataset<Row>> client) {
        return ((KyloCatalogClientV2) client).getSparkSession().sparkContext().accumulable(initialValue, name, param);
    }
/**
     * Applies the specified function to the specified field of the data set.
     */
    @Nonnull
    static Dataset<Row> map(@Nonnull final Dataset<Row> dataSet, @Nonnull final String fieldName, @Nonnull final Function1 function, @Nonnull final DataType returnType) {
        final Seq<Column> inputs = Seq$.MODULE$.<Column>newBuilder().$plus$eq(dataSet.col(fieldName)).result();
        final UserDefinedFunction udf = new UserDefinedFunction(function, returnType, Option$.MODULE$.<Seq<DataType>>empty());
        return dataSet.withColumn(fieldName, udf.apply(inputs));
    }

Can be found in here and here

I'm adding a new maven module kylo-spark-catalog-spark-v3 to support apache-spark:3.1.2 and scala:2.12.10 at time of writing this.

I'm having trouble in:

  1. Creating an instance of AccumulatorV2 as the deprecation notice on the Accumulable class is not very clear. here's my attempt at the first function - NOT COMPILING:
    @Nonnull
    static <R, P1> AccumulatorV2<R, P1> accumulable(@Nonnull final R initialValue, @Nonnull final String name, @Nonnull final AccumulatorV2<R, P1> param,
                                                  @Nonnull final KyloCatalogClient<Dataset<Row>> client) {
        AccumulatorV2<R, P1> acc = AccumulatorContext.get(AccumulatorContext.newId()).get();
        acc.register(((KyloCatalogClientV3) client).getSparkSession().sparkContext(), new Some<>(name), true);
        return acc;
    }
  1. Creating an instance of UDF in the second function, the UserDefinedFunction seems to complain that it cannot be instanciated as its an abstract class. here's my attempt at the second function - COMPILING but not sure if makes sense:
    /**
     * Applies the specified function to the specified field of the data set.
     */
    @Nonnull
    static Dataset<Row> map(@Nonnull final Dataset<Row> dataSet, @Nonnull final String fieldName, @Nonnull final Function1 function, @Nonnull final DataType returnType) {
        final Seq<Column> inputs = Seq$.MODULE$.<Column>newBuilder().$plus$eq(dataSet.col(fieldName)).result();
        final UserDefinedFunction udf = udf(function, returnType);
        return dataSet.withColumn(fieldName, udf.apply(inputs));
    }

Can you please advice me on how to get this right, or if there's docs out there that is close to this case.

Upvotes: 0

Views: 1131

Answers (1)

ganczarek
ganczarek

Reputation: 11

1. accumulable


Older versions of Spark had two APIs for accumulators: Accumulable (when input and output types where different) and Accumulator (when input and output types are the same, i.e., Accumulable<T,T>). To create an Accumulable<OUT, IN> instance you needed an AccumulableParam<OUT, IN> that defined "merge" operation (OUT+OUT), "add" operation (OUT+IN) and zero value of type OUT.

AccumulatorV2 is organised differently. It's an abstract class that needs to be extended with add and merge operations; and an understanding of what is zero value. You can see an example implementation of CollectionAcumulator in Spark's source code.

Responsibility of accumulable function changes with AccumulatorV2 API. It doesn't need to create accumulator anymore. It just needs to register it in SparkContext with a give name. I think the following would make sense:

@Nonnull
static <R, P1> AccumulatorV2<P1, R> accumulable(
    @Nonnull final R initialValue, // unused
    @Nonnull final String name,
    @Nonnull final AccumulatorV2<P1, R> acc,
    @Nonnull final KyloCatalogClient<Dataset<Row>> client
) {
    ((KyloCatalogClientV2) client).getSparkSession().sparkContext().register(acc, name);
    return acc;
}

A few things to note:

  • Order of types in class signature changed: Accumulable<OUT, IN> vs AccumulatorV2<IN, OUT>
  • initialValue needs to be passed to an accumulator at creation time. However, you need to make sure that copyAndReset returns a new accumulator with zero value and not initialValue. Alternatively, you need to add a value of type P1 that after adding to the accumulator results in accumulator returning expected initial value of type R. It's not a very good idea, just an alternative.
  • Also, don't use AccumulatorContext, because as per documentation it's

An internal class used to track accumulators by Spark itself.

2. map


I think your function is correct and makes sense. Between Spark v2.0 and v3.1 UserDefinedFunction changed to an abstract class and now you instantiate it with udf function ( import static org.apache.spark.sql.functions.udf)

Upvotes: 1

Related Questions