Pyd
Pyd

Reputation: 6159

How to load large amount of data from MySQL and save as text file?

I'm fetching large amount of data from MySql Database using LIMIT and OFFSET like:

var offset = 0
for (s <- a to partition) {

  val query = "(select * from destination LIMIT 100000 OFFSET " + offset + ") as src"
  data = data.union(spark.read.jdbc(url, query, connectionProperties).rdd.map(_.mkString(","))).persist(StorageLevel.DISK_ONLY)
  offset += 100000
}
val dest = data.collect.toArray
val s = spark.sparkContext.parallelize(dest, 1).persist(StorageLevel.DISK_ONLY).saveAsTextFile("/home/hduser/Desktop/testing")

For small amount of data its working fine whereas for large amount of data it throws error like java.lang.OutOfMemoryError: Java heap space so if i can persist val dest = data.collect.toArray it will work as expected, sorry for such a naive question i'm new to spark.

Partition method:

val query = "(select * from destination) as dest"
val options = Map(
"url" -> "jdbc:mysql://192.168.175.35:3306/sample?useSSL=false",
"dbtable" -> query,
"user" -> "root",
"password" -> "root")

val destination = spark.read.options(options).jdbc(options("url"), options("dbtable"), "0", 1, 5, 4, new java.util.Properties()).rdd.map(_.mkString(","))
.persist(StorageLevel.DISK_ONLY).saveAsTextFile("/home/hduser/Desktop/testing")

Thank you

Upvotes: 0

Views: 1269

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74769

I'm fetching large amount of data

That's why you use Spark, isn't it? :)

 for (s <- a to partition)    
 val dest = data.collect.toArray    
 spark.sparkContext.parallelize(dest, 1)

NOTE : Don't do that. I'd even call it a Spark anti-pattern where you load a dataset on executors (from MySQL using JDBC) only to transfer this "large amount of data" to the driver that in turn will transfer it back to the executors to save it to disk.

It's as if you wanted to get rid of Spark doing these network round trips.

spark.read.jdbc supports partitioning your dataset at load time out of the box using partitionColumn, lowerBound, upperBound options (see JDBC To Other Databases) or (undocumented) predicates option.

partitionColumn, lowerBound, upperBound describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.

Let Spark do its job(s).

Upvotes: 2

Related Questions