Reputation: 11285
A help for the implementation best practice is needed. The operating environment is as follows:
In a test, I tried to process 160,000 post-processed files by Spark starting with sc.textFile() with glob path, it failed with OutOfMemory exception on the driver process.
What is the best practice to handle this kind of data? Should I use HBase instead of plain files to save post-processed data?
Upvotes: 15
Views: 9234
Reputation: 273
I wrote own loader. It solved our problem with small files in HDFS. It uses Hadoop CombineFileInputFormat. In our case it reduced the number of mappers from 100000 to approx 3000 and made job significantly faster.
https://github.com/RetailRocket/SparkMultiTool
Example:
import ru.retailrocket.spark.multitool.Loaders
val sessions = Loaders.combineTextFile(sc, "file:///test/*")
// or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n")
// where size is split size in Megabytes, delim - line break character
println(sessions.count())
Upvotes: 9
Reputation: 2483
You can use this
First You can get a Buffer/List of S3 Paths / Same for HDFS or Local Path
If you're trying with Amazon S3 then :
import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest
def listFiles(s3_bucket:String, base_prefix : String) = {
var files = new ArrayList[String]
//S3 Client and List Object Request
var s3Client = new AmazonS3Client();
var objectListing: ObjectListing = null;
var listObjectsRequest = new ListObjectsRequest();
//Your S3 Bucket
listObjectsRequest.setBucketName(s3_bucket)
//Your Folder path or Prefix
listObjectsRequest.setPrefix(base_prefix)
//Adding s3:// to the paths and adding to a list
do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (objectSummary <- objectListing.getObjectSummaries().asScala) {
files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
}
listObjectsRequest.setMarker(objectListing.getNextMarker());
} while (objectListing.isTruncated());
//Removing Base Directory Name
files.remove(0)
//Creating a Scala List for same
files.asScala
}
Now Pass this List object to the following piece of code, note : sc is an object of SQLContext
var df: DataFrame = null;
for (file <- files) {
val fileDf= sc.textFile(file)
if (df!= null) {
df= df.unionAll(fileDf)
} else {
df= fileDf
}
}
Now you got a final Unified RDD i.e. df
Optional, And You can also repartition it in a single BigRDD
val files = sc.textFile(filename, 1).repartition(1)
Repartitioning always works :D
Upvotes: 1
Reputation: 31523
I'm pretty sure the reason your getting OOM is because of handling so many small files. What you want is to combine the input files so you don't get so many partitions. I try to limit my jobs to about 10k partitions.
After textFile
, you can use .coalesce(10000, false)
... not 100% sure that will work though because it's been a while since I've done it, please let me know. So try
sc.textFile(path).coalesce(10000, false)
Upvotes: 3