bhomass
bhomass

Reputation: 3572

can you use spark-shell programmatically

is it possible to run a spark-shell from a java or scala program? another words, start a spark-shell session inside a java program, pass spark code to it and read back the response, and continue the interaction inside the code.

Upvotes: 3

Views: 2984

Answers (2)

NehaM
NehaM

Reputation: 1352

This is a working solution on top of Spark 1.6.0 and Scala 2.10. Create SparkIMain with Settings and bind the variables and values associated with types.

import org.apache.spark.repl.SparkIMain
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

import scala.tools.nsc.GenericRunnerSettings
class TestMain {
  def exec(): Unit = {
    val settings = new GenericRunnerSettings( println _ )
        settings.usejavacp.value = true
    val interpreter = new SparkIMain(settings)

    val conf = new SparkConf().setAppName("TestMain").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val methodChain =
      """
        val df = sqlContext.read
              .format("com.databricks.spark.csv")
              .option("header", "false")
              .option("inferSchema", "true")
              .option("treatEmptyValuesAsNulls", "true")
              .option("parserLib", "univocity")
              .load("example-data.csv")

        df.show()

      """
    interpreter.bind("sqlContext" ,"org.apache.spark.sql.SQLContext", sqlContext)
    val resultFlag = interpreter.interpret(methodChain)
  }
}

object TestInterpreter{

    def main(args: Array[String]) {
      val testMain = new TestMain()
      testMain.exec()
      System.exit(0)
    }}

Upvotes: 1

MitsakosGR
MitsakosGR

Reputation: 531

If you want to use spark-shell you can always call it from java and then capture its stdin and stdout to pass text and get responses.

OutputStream stdin = null;
InputStream stderr = null;
InputStream stdout = null;

Process process = Runtime.getRuntime ().exec ("spark-shell");
stdin = process.getOutputStream ();
stderr = process.getErrorStream ();
stdout = process.getInputStream ();

But there is actually no reason doing so. Spark-Shell is mostly for learning and testing. Everything you can do from the shell you can do it from a Java app, even interactively.

Consider the following example: You want to count errors and if they are more than 100 ask user if he wants to display them at the console. If they are less than 100 display them anyway:

JavaRDD<String> lines = sc.textFile("hdfs://log.txt").filter(s -> s.contains("error"));
if(lines.count() > 100)
{
    System.out.println("Errors are more than 100 do you wish to display them? (y/n)");

    BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
    if(br.readLine().equals("y"))
    {
        List<String> errors = lines.collect();
        for(String s : errors)
            System.out.println(s);
    }
}
else
{
    List<String> errors = lines.collect();
    for(String s : errors)
        System.out.println(s);
}

Upvotes: 1

Related Questions