Reputation: 71
Please take a look at this code below . I am getting error for the below code when I pass value for the number of partitions.
def loadDataFromPostgress(sqlContext: SQLContext, tableName: String,
columnName: String, dbURL: String, userName: String, pwd: String,
partitions: String): DataFrame = {
println("the no of partitions are : "+partitions)
var dataDF = sqlContext.read.format("jdbc").options(
scala.collection.Map("url" -> dbURL,
"dbtable" -> tableName,
"driver" -> "org.postgresql.Driver",
"user" -> userName,
"password" -> pwd,
"partitionColumn" -> columnName,
"numPartitions" -> "1000")).load()
return dataDF
}
error:
java.lang.RuntimeException: Partitioning incompletely specified
App > at scala.sys.package$.error(package.scala:27)
App > at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:38)
App > at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:315)
App > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
App > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122)
App > at Test$.loadDataFromGreenPlum(script.scala:28)
App > at Test$.loadDataFrame(script.scala:15)
App > at Test$.main(script.scala:59)
App > at Test.main(script.scala)
App > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
App > at
Upvotes: 0
Views: 836
Reputation: 1326
you can check code below how exactly you can use.
def loadDataFromPostgress(sqlContext: SQLContext, tableName: String,
columnName: String, dbURL: String, userName: String,
pwd: String, partitions: String): DataFrame = {
println("the no of partitions are : " + partitions)
val dataDF = sqlContext.read.format("jdbc").options(
scala.collection.Map("url" -> dbURL,
"dbtable" -> "(select mod(tmp.empid,10) as hash_code,tmp.* from employee as tmp) as t",
"driver" -> "org.postgresql.Driver",
"user" -> userName,
"password" -> pwd,
"partitionColumn" -> hash_code,
"lowerBound" -> 0,
"upperBound" -> 10
"numPartitions" -> "10"
) ).load()
dataDF
}
Above code will create 10 tasks with 10 queries as shown below. Before that job will find out
offset = (upperBound-lowerBound)/numPartitions
Here offset = (10-0)/10 = 1
select mod(tmp.empid,10) as hash_code,tmp.* from employee as tmp where hash_code between 0 between 1
select mod(tmp.empid,10) as hash_code,tmp.* from employee as tmp where hash_code between 1 between 2
.
.
select mod(tmp.empid,10) as hash_code,tmp.* from employee as tmp where hash_code between 9 between 10
This will create 10 partitions and
empid ends with 0 will be going one partition as mod(empid,10) always equals 0
empid ends with 1 will be going one partition as mod(empid,10) always equals 1
like this all employee rows will be spitted into 10 partitions.
you have to change partitionColumn,upperBound,lowerBound,numPartitions values according to your requirements.
Hope my answer helps you.
Upvotes: 3
Reputation: 36
Partitioning requires:
The last two are missing, and this is why you get the error.
Upvotes: 0