wavemonger
wavemonger

Reputation: 71

How to build a simple spark-test-app.jar to test AWS SageMaker SparkJarProcessing

Does anyone know a repo that shows what a simple HelloWorld java or scala code would look like to build the jar that could be executed using the AWS SageMaker SparkJarProcessing class?

Readthedocs (https://sagemaker-examples.readthedocs.io/en/latest/sagemaker_processing/spark_distributed_data_processing/sagemaker-spark-processing.html) mentions:

"In the next example, you’ll take a Spark application jar (located in ./code/spark-test-app.jar)..."

My question is how does the source code look like for this jar (spark-test-app.jar)?

I tried building a simple Java project jar

src>com.test>HW.java:


public class HW {
    public static void main(String[] args) {
        System.out.printf("hello world!");
    }
}

and running it inside SageMaker Notebook conda_python3 kernel using

from sagemaker.spark.processing import SparkJarProcessor
from sagemaker import get_execution_role

role = get_execution_role()
print(role)

spark_processor = SparkJarProcessor(
    base_job_name="sm-spark-java",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
)

spark_processor.run(
    submit_app="./SparkJarProcessing-1.0-SNAPSHOT.jar",
    submit_class="com.test.HW",
    arguments=["--input", "abc"],
    logs=True,
)

But end up getting an error: Could not execute HW class.

Any sample source code for spark-test-app.jar would be highly appreciated!

Upvotes: 0

Views: 285

Answers (1)

Giuseppe A. Porcelli
Giuseppe A. Porcelli

Reputation: 36

To answer your question, the source code of that class looks like:

package com.amazonaws.sagemaker.spark.test;

import java.lang.invoke.SerializedLambda;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.Options;
import org.apache.spark.sql.Dataset;
import org.apache.commons.cli.CommandLine;
import org.apache.spark.sql.types.DataTypes;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import org.apache.spark.sql.SparkSession;

public class HelloJavaSparkApp
{
    public static void main(final String[] args) {
        System.out.println("Hello World, this is Java-Spark!");
        final CommandLine parsedArgs = parseArgs(args);
        final String inputPath = parsedArgs.getOptionValue("input");
        final String outputPath = parsedArgs.getOptionValue("output");
        final SparkSession spark = SparkSession.builder().appName("Hello Spark App").getOrCreate();
        System.out.println("Got a Spark session with version: " + spark.version());
        System.out.println("Reading input from: " + inputPath);
        final Dataset salesDF = spark.read().json(inputPath);
        salesDF.printSchema();
        salesDF.createOrReplaceTempView("sales");
        final Dataset topDF = spark.sql("SELECT date, sale FROM sales WHERE sale > 750 SORT BY sale DESC");
        topDF.show();
        final Dataset avgDF = salesDF.groupBy("date", new String[0]).avg(new String[0]).orderBy("date", new String[0]);
        System.out.println("Collected average sales: " + StringUtils.join((Object[])new List[] { avgDF.collectAsList() }));
        spark.sqlContext().udf().register("double", n -> n + n, DataTypes.LongType);
        final Dataset saleDoubleDF = salesDF.selectExpr(new String[] { "date", "sale", "double(sale) as sale_double" }).orderBy("date", new String[] { "sale" });
        saleDoubleDF.show();
        System.out.println("Writing output to: " + outputPath);
        saleDoubleDF.coalesce(1).write().json(outputPath);
        spark.stop();
    }
    
    private static CommandLine parseArgs(final String[] args) {
        final Options options = new Options();
        final CommandLineParser parser = (CommandLineParser)new BasicParser();
        final Option input = new Option("i", "input", true, "input path");
        input.setRequired(true);
        options.addOption(input);
        final Option output = new Option("o", "output", true, "output path");
        output.setRequired(true);
        options.addOption(output);
        try {
            return parser.parse(options, args);
        }
        catch (ParseException e) {
            new HelpFormatter().printHelp("HelloScalaSparkApp --input /opt/ml/input/foo --output /opt/ml/output/bar", options);
            throw new RuntimeException((Throwable)e);
        }
    }
}

At the same time, I have created a simple example that shows how to run an hello world app here. Please note that I have run that example on Amazon SageMaker Studio Notebooks, using the Data Science 1.0 kernel.

Hope this helps.

Upvotes: 1

Related Questions