Somasundaram Sekar
Somasundaram Sekar

Reputation: 5524

Spark: java.io.NotSerializableException: com.amazonaws.services.s3.AmazonS3Client

I'm trying to read a large number of large files from S3, which takes considerable time if done as a Dataframe function. So following this post and the related gist I'm trying to use RDD to read the s3 objects in parallel as below

def dfFromS3Objects(s3: AmazonS3, bucket: String, prefix: String, pageLength: Int = 1000) = {
    import com.amazonaws.services.s3._
    import model._
    import spark.sqlContext.implicits._

    import scala.collection.JavaConversions._

    val request = new ListObjectsRequest()
    request.setBucketName(bucket)
    request.setPrefix(prefix)
    request.setMaxKeys(pageLength)

    val objs: ObjectListing = s3.listObjects(request) // Note that this method returns truncated data if longer than the "pageLength" above. You might need to deal with that.

    spark.sparkContext.parallelize(objs.getObjectSummaries.map(_.getKey).toList)
      .flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }.toDF()
  }

which when tested ends up with

Caused by: java.io.NotSerializableException: com.amazonaws.services.s3.AmazonS3Client
Serialization stack:
    - object not serializable (class: com.amazonaws.services.s3.AmazonS3Client, value: com.amazonaws.services.s3.AmazonS3Client@35c8be21)
    - field (class: de.smava.data.bards.anonymize.HistoricalBardAnonymization$$anonfun$dfFromS3Objects$2, name: s3$1, type: interface com.amazonaws.services.s3.AmazonS3)
    - object (class de.smava.data.bards.anonymize.HistoricalBardAnonymization$$anonfun$dfFromS3Objects$2, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
    ... 63 more

I understand that AmazonS3 object that I supply needs to be shipped to executors, hence needs to be serializable, but this is from a sample snippet meaning someone got it working, need help in figuring out what am I missing here

Upvotes: 3

Views: 2554

Answers (1)

ollik1
ollik1

Reputation: 4540

In the gist the s3 is defined as method which will create a new client for every call. This is not recommended. One way around the problem is to use mapPartitions

spark
  .sparkContext
  .parallelize(objs.getObjectSummaries.map(_.getKey).toList)
  .mapPartitions { it =>
    val s3 = ... // init the client here
    it.flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }
  }
  .toDF

This would still create multiple clients per JVM but possibly vastly less than the version that creates a client per every object. If you wanted to re-use the client between threads inside a JVM, you could e.g. wrap it in a top-level object

object Foo {
  val s3 = ...
}

and use static configuration for the client.

Upvotes: 2

Related Questions