Haha
Haha

Reputation: 1009

Optimizing Spark/Scala speed

I have a Spark script that establishes a connection to Hive and read Data from different databases and then writes the union into a CSV file. I tested it with two databases and it took 20 minutes. Now I am trying it with 11 databases and it has been running since yesterday evening (18 hours!). The script is supposed to get between 400000 and 800000 row per database.

My question is: is 18 hours normal for such jobs? If not, how can I optimize it? This is what my main does:

// This is a list of the ten first databases used:
var use_database_sigma = List( Parametre_vigiliste.sourceDbSigmaGca, Parametre_vigiliste.sourceDbSigmaGcm
                                  ,Parametre_vigiliste.sourceDbSigmaGge, Parametre_vigiliste.sourceDbSigmaGne
                                  ,Parametre_vigiliste.sourceDbSigmaGoc, Parametre_vigiliste.sourceDbSigmaGoi
                                  ,Parametre_vigiliste.sourceDbSigmaGra, Parametre_vigiliste.sourceDbSigmaGsu
                                  ,Parametre_vigiliste.sourceDbSigmaPvl, Parametre_vigiliste.sourceDbSigmaLbr)


    val grc = Tables.getGRC(spark) // This creates the first dataframe

    var sigma = Tables.getSIGMA(spark, use_database_sigma(0)) // This creates other dataframe which is the union of ten dataframes (one database each)
    for(i <- 1 until use_database_sigma.length)
    {
    if (use_database_sigma(i) != "")
     {
        sigma = sigma.union(Tables.getSIGMA(spark, use_database_sigma(i)))
      }
    }

// writing into csv file

    val grc_sigma=sigma.union(grc) // union of the 2 dataframes
    grc_sigma.cache
    LogDev.ecrireligne("total : " + grc_sigma.count())
    grc_sigma.repartition(1).write.mode(SaveMode.Overwrite).format("csv").option("header", true).option("delimiter", "|").save(Parametre_vigiliste.cible)
    val conf = new Configuration()
    val fs = FileSystem.get(conf)
    val file = fs.globStatus(new Path(Parametre_vigiliste.cible + "/part*"))(0).getPath().getName();
        fs.rename(new Path(Parametre_vigiliste.cible + "/" + file), new Path(Parametre_vigiliste.cible + "/" + "FIC_PER_DATALAKE_.csv"));
  grc_sigma.unpersist()

Upvotes: 0

Views: 299

Answers (1)

Molotch
Molotch

Reputation: 475

Not written in an IDE so it might be off somewhere, but you get the general idea.

val frames = Seq("table1", "table2).map{ table =>
   spark.read.table(table).cache()
}

frames
.reduce(_.union(_)) //or unionByName() if the columns aren't in the same order
.repartition(1)
.write
.mode(SaveMode.Overwrite)
.format("csv")
.options(Map("header" -> "true", "delimiter" -> "|"))
.save("filePathName")

Upvotes: 2

Related Questions