puligun
puligun

Reputation: 362

Task not serializable - foreach function spark

I have a function getS3Object to get a json object stored in S3

def getS3Object (s3ObjectName) : Unit = {
  val bucketName = "xyz"
  val object_to_write = s3client.getObject(bucketName, s3ObjectName)
  val file = new File(filename)
  fileWriter = new FileWriter(file)
  bw = new BufferedWriter(fileWriter)
  bw.write(object_to_write)
  bw.close()
  fileWriter.close()

}

My dataframe (df) contains one column where each row is the S3ObjectName

S3ObjectName
a1.json
b2.json
c3.json
d4.json
e5.json

When I execute the below logic I get an error saying "task is not serializable"

Method 1:- df.foreach(x => getS3Object(x.getString(0))

I tried converting the df to rdd but still get the same error

Method 2:- df.rdd.foreach(x => getS3Object(x.getString(0))

However it works with collect()

Method 3:- df.collect.foreach(x => getS3Object(x.getString(0))

I do not wish to use the collect() method as all the elements of the dataframe are collected to the driver and potentially result in OutOfMemory error.

Is there a way to make the foreach() function work using Method 1?

Upvotes: 1

Views: 966

Answers (1)

sarveshseri
sarveshseri

Reputation: 13985

The problem for your s3Client can be solved as following. But you have to remember that these functions run on executor nodes (other machines), so your whole val file = new File(filename) thing is probably not going to work here.

You can put your files on some distibuted file system like HDFS or S3.

object S3ClientWrapper extends Serializable {
  // s3Client must be created here.
  val s3Client = {
    val awsCreds = new BasicAWSCredentials("access_key_id", "secret_key_id")

    AmazonS3ClientBuilder.standard()
                        .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
                        .build()
  }
}

def getS3Object (s3ObjectName) : Unit = {
  val bucketName = "xyz"
  val object_to_write = S3ClientWrapper.s3Client.getObject(bucketName, s3ObjectName)
  // now you have to solve your file problem
}


Upvotes: 1

Related Questions