Reputation: 6159
I'm fetching large amount of data from MySql Database using LIMIT
and OFFSET
like:
var offset = 0
for (s <- a to partition) {
val query = "(select * from destination LIMIT 100000 OFFSET " + offset + ") as src"
data = data.union(spark.read.jdbc(url, query, connectionProperties).rdd.map(_.mkString(","))).persist(StorageLevel.DISK_ONLY)
offset += 100000
}
val dest = data.collect.toArray
val s = spark.sparkContext.parallelize(dest, 1).persist(StorageLevel.DISK_ONLY).saveAsTextFile("/home/hduser/Desktop/testing")
For small amount of data its working fine whereas for large amount of data it throws error like java.lang.OutOfMemoryError: Java heap space
so if i can persist val dest = data.collect.toArray
it will work as expected, sorry for such a naive question i'm new to spark.
Partition method:
val query = "(select * from destination) as dest"
val options = Map(
"url" -> "jdbc:mysql://192.168.175.35:3306/sample?useSSL=false",
"dbtable" -> query,
"user" -> "root",
"password" -> "root")
val destination = spark.read.options(options).jdbc(options("url"), options("dbtable"), "0", 1, 5, 4, new java.util.Properties()).rdd.map(_.mkString(","))
.persist(StorageLevel.DISK_ONLY).saveAsTextFile("/home/hduser/Desktop/testing")
Thank you
Upvotes: 0
Views: 1269
Reputation: 74769
I'm fetching large amount of data
That's why you use Spark, isn't it? :)
for (s <- a to partition)
val dest = data.collect.toArray
spark.sparkContext.parallelize(dest, 1)
NOTE : Don't do that. I'd even call it a Spark anti-pattern where you load a dataset on executors (from MySQL using JDBC) only to transfer this "large amount of data" to the driver that in turn will transfer it back to the executors to save it to disk.
It's as if you wanted to get rid of Spark doing these network round trips.
spark.read.jdbc
supports partitioning your dataset at load time out of the box using partitionColumn
, lowerBound
, upperBound
options (see JDBC To Other Databases) or (undocumented) predicates
option.
partitionColumn, lowerBound, upperBound describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.
Let Spark do its job(s).
Upvotes: 2