tschmit007
tschmit007

Reputation: 7800

is it possible (and how) to specify an sql query on command line with spark-submit

I have the following code:

def main(args: Array[String]) {
    var dvfFiles : String = "g:/data/gouv/dvf/raw" 
    var q : String = ""
    //q = "SELECT distinct DateMutation, NVoie, IndVoie, Voie, Valeur, CodeTypeLocal, TypeLocal, Commune FROM mutations WHERE Commune = 'ICI' and Valeur > 100000 and CodeTypeLocal in (1, 2) order by Valeur desc"

    args.sliding(2, 2).toList.collect {         
        case Array("--sfiles", argFiles: String) => dvfFiles = argFiles
        case Array("--squery", argQ: String) => q = argQ
    }
    println(s"files from: ${dvfFiles}")

if I run the following command:

G:\dev\fromGit\dvf\spark>spark-submit .\target\scala-2.11\dfvqueryer_2.11-1.0.jar \
--squery "SELECT distinct DateMutation, NVoie, IndVoie, Voie, Valeur, CodeTypeLocal, \
TypeLocal, Commune FROM mutations WHERE (Commune = 'ICI') and (Valeur > 100000) and (CodeTypeLocal in (1, 2)) order by Valeur desc"

I got the following result:

== SQL ==

SELECT distinct DateMutation, NVoie, IndVoie, Voie, Valeur, CodeTypeLocal, TypeLocal, Commune FROM mutations WHERE (Commune = 'ICI') and (Valeur and (CodeTypeLocal in (1, 2)) order by Valeur desc ----------------------------------------------------------------------------------------------^^^

the ^^^ pointing the FROM

I also notice the missing > 100000 after Valeur.

the query is correct because if I uncomment the //q =..., package the code and submit it, all happens fine.

Upvotes: 2

Views: 818

Answers (1)

afeldman
afeldman

Reputation: 512

Seems that the process is burning part of the query during input. One solution to this problem would be to send the entire argument of you select query in one line and input it into a string value. In that format it can be immediately pipelined into the sql function to run you query. Below is how you can build out the function:

//The Package Tree
package stack.overFlow 

//Call all needed packages 
import org.apache.spark.sql.{DataFrame, SparkSession, Column, SQLContext} 
import org.apache.spark.SparkContext 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql  

//Object Name
object demoCode {
  def main(args: Array[String]) {
    ///Build the contexts 
    var spark = SparkSession.builder.enableHiveSupport().getOrCreate() 
    var sc = spark.sparkContext 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    //Set the query as a string for argument 1
    val commandQuery : String = args(0)

    //Pass query to the sql function
    val inputDF = spark.sql(commandQuery)
  }
}

When the function compiles you will need two objects (1) the Jar as well as (2) the package tree and class for running the function. When running bot of those within --class all you need to do is include a space and pass through the SQL query so on run time it will be loaded into the spark session.

spark-submit --class stack.overFlow.demoCode /home/user/demo_code/target/demoCode-compilation-jar.jar \
SELECT distinct DateMutation, NVoie, IndVoie, Voie, Valeur, CodeTypeLocal,TypeLocal, Commune FROM mutations WHERE (Commune = 'ICI') and (Valeur > 100000) and (CodeTypeLocal in (1, 2)) order by Valeur desc

Would this help your use-case or do you need it to be in another format?

Upvotes: 2

Related Questions