ben10
ben10

Reputation: 83

In Spark,while writing dataset into database it takes some pre-assumed time for save operation

I ran the spark-submit command as mentioned below,which performs the Datasets loading from DB,processing,and in final stage it push the multiple datasets into Oracle DB.

./spark-submit --class com.sample.Transformation --conf spark.sql.shuffle.partitions=5001 
    --num-executors=40 --executor-cores=1 --executor-memory=5G 
    --jars /scratch/rmbbuild/spark_ormb/drools-jars/ojdbc6.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-api-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/drools-core-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/drools-compiler-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-maven-support-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-internal-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/xstream-1.4.10.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-commons-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/ecj-4.4.2.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/mvel2-2.4.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-project-datamodel-commons-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-project-datamodel-api-7.7.0.Final.jar 
    --driver-class-path /scratch/rmbbuild/spark_ormb/drools-jars/ojdbc6.jar 
    --master spark://10.180.181.41:7077 "/scratch/rmbbuild/spark_ormb/POC-jar/Transformation-0.0.1-SNAPSHOT.jar" 
        > /scratch/rmbbuild/spark_ormb/POC-jar/logs/logs12.txt

But,it takes some pre-assumed time while writing the dataset into the DB,don't know why it is consuming this long time before starting the write process. Attaching the screenshot which clearly highlights the problem which i am facing. Please go through the screenshot before commenting out the solution. Spark Dashboard Stages Screenshot:

If we look at the screenshot I have highlighted the timing of around 10mins,which is consumed before every dataset write into the DB. Even I changed the batchsize to 100000 as such follows:

outputDataSetforsummary.write().mode("append").format("jdbc").option("url", connection)
    .option("batchSize", "100000").option("dbtable", CI_TXN_DTL).save();

So,if any one can explain out why this pre-write time in consumed everytime,and how to avoid this timings.

I am attaching the code for more description of the program.

   public static void main(String[] args) {

        SparkConf conf = new
            //  SparkConf().setAppName("Transformation").setMaster("local");
        SparkConf().setAppName("Transformation").setMaster("spark://xx.xx.xx.xx:7077");
        String connection = "jdbc:oracle:thin:ABC/abc@//xx.x.x.x:1521/ABC";
    
        // Create Spark Context
        SparkContext context = new SparkContext(conf);
        // Create Spark Session
        SparkSession sparkSession = new SparkSession(context);
        Dataset<Row> txnDf  = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", CI_TXN_DETAIL_STG).load();
        //Dataset<Row> txnDf  = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", "CI_TXN_DETAIL_STG").load();
        Dataset<Row> newTxnDf  = txnDf.drop(ACCT_ID);
        
        Dataset<Row> accountDf = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", CI_ACCT_NBR).load();
        //  Dataset<Row> accountDf = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", "CI_ACCT_NBR").load();

        Dataset<Row> joined = newTxnDf.join(accountDf, newTxnDf.col(ACCT_NBR).equalTo(accountDf.col(ACCT_NBR))
                .and(newTxnDf.col(ACCT_NBR_TYPE_CD).equalTo(accountDf.col(ACCT_NBR_TYPE_CD))), "inner");
        Dataset<Row> finalJoined = joined.drop(accountDf.col(ACCT_NBR_TYPE_CD)).drop(accountDf.col(ACCT_NBR))
                .drop(accountDf.col(VERSION)).drop(accountDf.col(PRIM_SW));

        
        initializeProductDerivationCache(sparkSession,connection);
      
        
        ClassTag<List<String>> evidenceForDivision = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForDiv = context.broadcast(divisionList, evidenceForDivision);
        
        ClassTag<List<String>> evidenceForCurrency = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForCurrency = context.broadcast(currencySet, evidenceForCurrency);
        
        ClassTag<List<String>> evidenceForUserID = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForUserID = context.broadcast(userIdList, evidenceForUserID);
        
    
        
        Encoder<RuleParamsBean> encoder = Encoders.bean(RuleParamsBean.class);
        Dataset<RuleParamsBean> ds = new Dataset<RuleParamsBean>(sparkSession, finalJoined.logicalPlan(), encoder);
        Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),
                                                            broadcastVarForUserID.value()),encoder);
        
    
        Dataset<RuleParamsBean> filteredDS = validateDataset.filter(validateDataset.col(BO_STATUS_CD).notEqual(TFMAppConstants.TXN_INVALID));
        //For formatting the data to be inserted in table -->   Dataset<Row>finalvalidateDataset = validateDataset.select("ACCT_ID");
        

    
        Encoder<TxnDetailOutput>txndetailencoder = Encoders.bean(TxnDetailOutput.class);
        Dataset<TxnDetailOutput>txndetailDS =validateDataset.map(ruleParamsBean ->outputfortxndetail(ruleParamsBean),txndetailencoder );
        
    
        
        
        KieServices ks = KieServices.Factory.get();
        KieContainer kContainer = ks.getKieClasspathContainer();
        ClassTag<KieBase> classTagTest = scala.reflect.ClassTag$.MODULE$.apply(KieBase.class);
        Broadcast<KieBase> broadcastRules = context.broadcast(kContainer.getKieBase(KIE_BASE), classTagTest);

        Encoder<PritmRuleOutput> outputEncoder = Encoders.bean(PritmRuleOutput.class);
        Dataset<PritmRuleOutput> outputDataSet = filteredDS.flatMap(rulesParamBean -> droolprocesMap(broadcastRules.value(), rulesParamBean), outputEncoder);

        Dataset<Row>piParamDS1 =outputDataSet.select(PRICEITEM_PARM_GRP_VAL);
        Dataset<Row> piParamDS = piParamDS1.withColumnRenamed(PRICEITEM_PARM_GRP_VAL, PARM_STR);

        priceItemParamGrpValueCache.createOrReplaceTempView("temp1");
        Dataset<Row>piParamDSS = piParamDS.where(queryToFiltertheDuplicateParamVal);
        Dataset<Row> priceItemParamsGrpDS = piParamDSS.select(PARM_STR).distinct().withColumn(PRICEITEM_PARM_GRP_ID, functions.monotonically_increasing_id());
        Dataset<Row> finalpriceItemParamsGrpDS = priceItemParamsGrpDS.withColumn(PARM_COUNT, functions.size(functions.split(priceItemParamsGrpDS.col(PARM_STR),TOKENIZER)));
        finalpriceItemParamsGrpDS.persist(StorageLevel.MEMORY_ONLY());
        finalpriceItemParamsGrpDS.distinct().write().mode("append").format("jdbc").option("url", connection).option("dbtable", CI_PRICEITEM_PARM_GRP_K).option("batchSize", "1000").save();

            
        
        Dataset<Row> PritmOutput = outputDataSet.join(priceItemParamsGrpDS,outputDataSet.col(PRICEITEM_PARM_GRP_VAL).equalTo(priceItemParamsGrpDS.col(PARM_STR)),"inner");
        Dataset<Row> samplePritmOutput = PritmOutput.drop(outputDataSet.col(PRICEITEM_PARM_GRP_ID))
                .drop(priceItemParamsGrpDS.col(PARM_STR));

        priceItemParamsGrpDS.createOrReplaceTempView(PARM_STR);
        Dataset<Row> priceItemParamsGroupTable =sparkSession.sql(FETCH_QUERY_TO_SPLIT);
        Dataset<Row> finalpriceItemParamsGroupTable = priceItemParamsGroupTable.selectExpr("PRICEITEM_PARM_GRP_ID","split(col, '=')[0] as PRICEITEM_PARM_CD ","split(col, '=')[1] as PRICEITEM_PARM_VAL");
        finalpriceItemParamsGroupTable.persist(StorageLevel.MEMORY_ONLY());
        finalpriceItemParamsGroupTable.distinct().write().mode("append").format("jdbc").option("url", connection).option("dbtable", CI_PRICEITEM_PARM_GRP).option("batchSize", "1000").save();
}

Upvotes: 1

Views: 604

Answers (1)

tauitdnmd
tauitdnmd

Reputation: 368

It reloads the whole data and joins data frames again and again in every write to db action.

Please add validateDataset.persist(StorageLevel.MEMORY_ONLY()) - (you should consider on mem or on disk or on mem_and_disk on your own depends on your data frame size . Is it fit in mem or not )

For example:

 Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),broadcastVarForUserID.value()),encoder)
.persist(StorageLevel.MEMORY_ONLY());

Upvotes: 1

Related Questions