Reputation: 362
I have a function getS3Object to get a json object stored in S3
def getS3Object (s3ObjectName) : Unit = {
val bucketName = "xyz"
val object_to_write = s3client.getObject(bucketName, s3ObjectName)
val file = new File(filename)
fileWriter = new FileWriter(file)
bw = new BufferedWriter(fileWriter)
bw.write(object_to_write)
bw.close()
fileWriter.close()
}
My dataframe (df) contains one column where each row is the S3ObjectName
S3ObjectName |
---|
a1.json |
b2.json |
c3.json |
d4.json |
e5.json |
When I execute the below logic I get an error saying "task is not serializable"
Method 1:- df.foreach(x => getS3Object(x.getString(0))
I tried converting the df to rdd but still get the same error
Method 2:- df.rdd.foreach(x => getS3Object(x.getString(0))
However it works with collect()
Method 3:- df.collect.foreach(x => getS3Object(x.getString(0))
I do not wish to use the collect() method as all the elements of the dataframe are collected to the driver and potentially result in OutOfMemory error.
Is there a way to make the foreach() function work using Method 1?
Upvotes: 1
Views: 966
Reputation: 13985
The problem for your s3Client
can be solved as following. But you have to remember that these functions run on executor nodes (other machines), so your whole val file = new File(filename)
thing is probably not going to work here.
You can put your files
on some distibuted file system like HDFS or S3.
object S3ClientWrapper extends Serializable {
// s3Client must be created here.
val s3Client = {
val awsCreds = new BasicAWSCredentials("access_key_id", "secret_key_id")
AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build()
}
}
def getS3Object (s3ObjectName) : Unit = {
val bucketName = "xyz"
val object_to_write = S3ClientWrapper.s3Client.getObject(bucketName, s3ObjectName)
// now you have to solve your file problem
}
Upvotes: 1