Saurabh Sharma
Saurabh Sharma

Reputation: 325

Regarding Spark Dataframereader jdbc

I have a question regarding Mechanics of Spark Dataframereader. I will appreciate if anybody can help me. Let me explain the Scenario here

I am creating a DataFrame from Dstream like this. This in Input Data

 var config = new HashMap[String,String]();
        config += ("zookeeper.connect" ->zookeeper);        
        config += ("partition.assignment.strategy" ->"roundrobin");
        config += ("bootstrap.servers" ->broker);
        config += ("serializer.class" -> "kafka.serializer.DefaultEncoder");
        config += ("group.id" -> "default"); 

        val lines =  KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,config.toMap,Set(topic)).map(_._2)

        lines.foreachRDD { rdd =>

                if(!rdd.isEmpty()){

                    val rddJson = rdd.map { x => MyFunctions.mapToJson(x) }       
                    



                           
                    val sqlContext = SQLContextSingleton.getInstance(ssc.sparkContext)

                    val rddDF = sqlContext.read.json(rddJson)

                    rddDF.registerTempTable("inputData")
            
                   

 val dbDF = ReadDataFrameHelper.readDataFrameHelperFromDB(sqlContext, jdbcUrl, "ABCD","A",numOfPartiton,lowerBound,upperBound)

Here is the code of ReadDataFrameHelper

def readDataFrameHelperFromDB(sqlContext:HiveContext,jdbcUrl:String,dbTableOrQuery:String,
            columnToPartition:String,numOfPartiton:Int,lowerBound:Int,highBound:Int):DataFrame={

        val jdbcDF = sqlContext.read.jdbc(url = jdbcUrl, table = dbTableOrQuery,
                columnName = columnToPartition,
                lowerBound = lowerBound,
                upperBound = highBound,
                numPartitions = numOfPartiton,
                connectionProperties = new java.util.Properties()
                )
                
            jdbcDF  

    }

Lastly I am doing a Join like this

 val joinedData = rddDF.join(dbDF,rddDF("ID") === dbDF("ID")
                                 && rddDF("CODE") === dbDF("CODE"),"left_outer")
                        .drop(dbDF("code"))
                        .drop(dbDF("id"))
                        .drop(dbDF("number"))
                        .drop(dbDF("key"))
                        .drop(dbDF("loaddate"))
                        .drop(dbDF("fid"))
joinedData.show()

My input DStream will have 1000 rows and data will contains million of rows. So when I do this join, will spark load all the rows from database and read those rows or will this just read those specific rows from DB which have the code,id from the input DStream

Upvotes: 1

Views: 959

Answers (1)

Saurabh Sharma
Saurabh Sharma

Reputation: 325

As specified by zero323, i have also confirmed that data will be read full from the table. I checked the DB session logs and saw that whole dataset is getting loaded.

Thanks zero323

Upvotes: 2

Related Questions