Reputation: 5524
I'm trying to read a large number of large files from S3, which takes considerable time if done as a Dataframe function. So following this post and the related gist I'm trying to use RDD to read the s3 objects in parallel as below
def dfFromS3Objects(s3: AmazonS3, bucket: String, prefix: String, pageLength: Int = 1000) = {
import com.amazonaws.services.s3._
import model._
import spark.sqlContext.implicits._
import scala.collection.JavaConversions._
val request = new ListObjectsRequest()
request.setBucketName(bucket)
request.setPrefix(prefix)
request.setMaxKeys(pageLength)
val objs: ObjectListing = s3.listObjects(request) // Note that this method returns truncated data if longer than the "pageLength" above. You might need to deal with that.
spark.sparkContext.parallelize(objs.getObjectSummaries.map(_.getKey).toList)
.flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }.toDF()
}
which when tested ends up with
Caused by: java.io.NotSerializableException: com.amazonaws.services.s3.AmazonS3Client
Serialization stack:
- object not serializable (class: com.amazonaws.services.s3.AmazonS3Client, value: com.amazonaws.services.s3.AmazonS3Client@35c8be21)
- field (class: de.smava.data.bards.anonymize.HistoricalBardAnonymization$$anonfun$dfFromS3Objects$2, name: s3$1, type: interface com.amazonaws.services.s3.AmazonS3)
- object (class de.smava.data.bards.anonymize.HistoricalBardAnonymization$$anonfun$dfFromS3Objects$2, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 63 more
I understand that AmazonS3
object that I supply needs to be shipped to executors, hence needs to be serializable, but this is from a sample snippet meaning someone got it working, need help in figuring out what am I missing here
Upvotes: 3
Views: 2554
Reputation: 4540
In the gist the s3
is defined as method which will create a new client for every call. This is not recommended. One way around the problem is to use mapPartitions
spark
.sparkContext
.parallelize(objs.getObjectSummaries.map(_.getKey).toList)
.mapPartitions { it =>
val s3 = ... // init the client here
it.flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }
}
.toDF
This would still create multiple clients per JVM but possibly vastly less than the version that creates a client per every object. If you wanted to re-use the client between threads inside a JVM, you could e.g. wrap it in a top-level object
object Foo {
val s3 = ...
}
and use static configuration for the client.
Upvotes: 2