Reputation: 832
I was looking at the AWS golang documentation for S3 Copy Object function, it contains the following details for handling large file uploads
However, to copy an object greater >than 5 GB, you must use the multipart upload Upload Part - >Copy API. For more information, see Copy Object Using the REST Multipart Upload API >(https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjctsUsingRESTMPUapi.html).
When I follow that link, it only contains code examples for Java and .Net
Am I missing some documentation/example somewhere showing how to copy an existing large file in S3 using the golang client?
Upvotes: 4
Views: 4201
Reputation: 354
The two other solutions are quite slow. Using goroutines can speed this up significantly, since most of the wait time is on the AWS side.
Here's a copy of the V1 solution Mike provided, but using goroutines (and properly getting the fileSize, which seems to be something he left out).
// constant for number of bits in 5 megabyte chunk
const max_part_size = 5 * 1024 * 1024
// helper function to build the string for the range of bits to copy
func buildCopySourceRange(start int64, objectSize int64) string {
end := start + max_part_size - 1
if end > objectSize {
end = objectSize - 1
}
startRange := strconv.FormatInt(start, 10)
stopRange := strconv.FormatInt(end, 10)
return "bytes=" + startRange + "-" + stopRange
}
// function that starts, perform each part upload, and completes the copy
func MultiPartCopy(sess *session.Session, sourceBucket string, sourceKey string, destBucket string, destKey string) error {
svc := s3.New(sess, aws.NewConfig().WithRegion(S3_REGION))
ctx, cancelFn := context.WithTimeout(context.TODO(), 10*time.Minute)
defer cancelFn()
headInput := s3.HeadObjectInput{
Bucket: aws.String(sourceBucket),
Key: aws.String(sourceKey),
}
result, err := svc.HeadObject(&headInput)
if err != nil {
return err
}
fileSize := *result.ContentLength
//send command to start copy and get the upload id as it is needed later
var uploadId string
startInput := s3.CreateMultipartUploadInput{
Bucket: &destBucket,
Key: &destKey,
}
createOutput, err := svc.CreateMultipartUploadWithContext(ctx, &startInput)
if err != nil {
return err
}
if createOutput != nil {
if createOutput.UploadId != nil {
uploadId = *createOutput.UploadId
}
}
if uploadId == "" {
return errors.New("No upload id found in start upload request")
}
var i int64
var partNumberCounter int64 = 1
copySource := "/" + sourceBucket + "/" + sourceKey
numUploads := (fileSize + max_part_size - 1) / max_part_size
fmt.Println("Will attempt upload in ", numUploads, " number of parts to ", destKey)
type Result struct {
Error error
Part *s3.CompletedPart
PartNumber int64
}
var wg sync.WaitGroup
ch := make(chan Result, numUploads)
count := 0
for i = 0; i < fileSize; i += max_part_size {
wg.Add(1)
count += 1
go func(wg *sync.WaitGroup, partStart int64, partNumber int64) {
defer wg.Done()
var err error
var part *s3.CompletedPart
copyRange := buildCopySourceRange(partStart, fileSize)
partInput := s3.UploadPartCopyInput{
Bucket: &destBucket,
CopySource: ©Source,
CopySourceRange: ©Range,
Key: &destKey,
PartNumber: &partNumber,
UploadId: &uploadId,
}
fmt.Println("Attempting to upload part", partNumber, "range:", copyRange)
partResp, err := svc.UploadPartCopy(&partInput)
if err != nil {
err = fmt.Errorf("Error uploading part %d : %w", partNumber, err)
} else if partResp != nil {
//copy etag and part number from response as it is needed for completion
partNum := partNumber
etag := strings.Trim(*partResp.CopyPartResult.ETag, "\"")
cPart := s3.CompletedPart{
ETag: &etag,
PartNumber: &partNum,
}
fmt.Println("Successfully uploaded part", partNumber, "of", uploadId)
part = &cPart
}
ch <- Result{Error: err, Part: part, PartNumber: partNumber}
}(&wg, i, partNumberCounter)
partNumberCounter += 1
}
wg.Wait()
close(ch)
orderedParts := make([]*s3.CompletedPart, numUploads)
for r := range ch {
if r.Error != nil {
fmt.Println("Attempting to abort upload")
abortIn := s3.AbortMultipartUploadInput{
UploadId: &uploadId,
}
//ignoring any errors with aborting the copy
svc.AbortMultipartUploadRequest(&abortIn)
return r.Error
}
if r.Part != nil {
orderedParts[r.PartNumber-1] = r.Part
}
}
// Filter out any nils from the parts list
parts := make([]*s3.CompletedPart, 0)
for _, part := range orderedParts {
if part != nil {
parts = append(parts, part)
}
}
//create struct for completing the upload
mpu := s3.CompletedMultipartUpload{
Parts: parts,
}
//complete actual upload
//does not actually copy if the complete command is not received
complete := s3.CompleteMultipartUploadInput{
Bucket: &destBucket,
Key: &destKey,
UploadId: &uploadId,
MultipartUpload: &mpu,
}
compOutput, err := svc.CompleteMultipartUpload(&complete)
if err != nil {
return fmt.Errorf("Error completing upload: %w", err)
}
if compOutput != nil {
fmt.Println("Successfully copied Bucket:", sourceBucket, "Key:", sourceKey, "to Bucket:", destBucket, "Key:", destKey)
}
return nil
}
Upvotes: 1
Reputation: 308
The same approach as @Mike's answer but using AWS-SDK-GO-V2:
import (
"logger"
"context"
"errors"
"strconv"
"strings"
"time"
"fmt"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
//constant for number of bits in 5 megabyte chunk
const max_part_size = 5 * 1024 * 1024
var log *logger.Logger
//helper function to build the string for the range of bits to copy
func buildCopySourceRange(start int64, objectSize int64) string {
end := start + max_part_size - 1
if end > objectSize {
end = objectSize - 1
}
startRange := strconv.FormatInt(start, 10)
stopRange := strconv.FormatInt(end, 10)
return "bytes=" + startRange + "-" + stopRange
}
//function that starts, perform each part upload, and completes the copy
func MultiPartCopy(svc *s3.Client, fileSize int64, sourceBucket string, sourceKey string, destBucket string, destKey string) error {
log = logger.GetLogger()
ctx, cancelFn := context.WithTimeout(context.TODO(), 10*time.Minute)
defer cancelFn()
//struct for starting a multipart upload
startInput := s3.CreateMultipartUploadInput{
Bucket: &destBucket,
Key: &destKey,
}
//send command to start copy and get the upload id as it is needed later
var uploadId string
createOutput, err := svc.CreateMultipartUpload(ctx, &startInput)
if err != nil {
return err
}
if createOutput != nil {
if createOutput.UploadId != nil {
uploadId = *createOutput.UploadId
}
}
if uploadId == "" {
return errors.New("No upload id found in start upload request")
}
var i int64
var partNumber int32 = 1
copySource := "/" + sourceBucket + "/" + sourceKey
parts := make([]types.CompletedPart, 0)
numUploads := fileSize / max_part_size
log.Infof("Will attempt upload in %d number of parts to %s", numUploads, destKey)
for i = 0; i < fileSize; i += max_part_size {
copyRange := buildCopySourceRange(i, fileSize)
partInput := s3.UploadPartCopyInput{
Bucket: &destBucket,
CopySource: ©Source,
CopySourceRange: ©Range,
Key: &destKey,
PartNumber: partNumber,
UploadId: &uploadId,
}
log.Debugf("Attempting to upload part %d range: %s", partNumber, copyRange)
partResp, err := svc.UploadPartCopy(context.TODO(), &partInput)
if err != nil {
log.Error("Attempting to abort upload")
abortIn := s3.AbortMultipartUploadInput{
UploadId: &uploadId,
}
//ignoring any errors with aborting the copy
svc.AbortMultipartUpload(context.TODO(), &abortIn)
return fmt.Errorf("Error uploading part %d : %w", partNumber, err)
}
//copy etag and part number from response as it is needed for completion
if partResp != nil {
partNum := partNumber
etag := strings.Trim(*partResp.CopyPartResult.ETag, "\"")
cPart := types.CompletedPart{
ETag: &etag,
PartNumber: partNum,
}
parts = append(parts, cPart)
log.Debugf("Successfully upload part %d of %s", partNumber, uploadId)
}
partNumber++
if partNumber%50 == 0 {
log.Infof("Completed part %d of %d to %s", partNumber, numUploads, destKey)
}
}
//create struct for completing the upload
mpu := types.CompletedMultipartUpload{
Parts: parts,
}
//complete actual upload
//does not actually copy if the complete command is not received
complete := s3.CompleteMultipartUploadInput{
Bucket: &destBucket,
Key: &destKey,
UploadId: &uploadId,
MultipartUpload: &mpu,
}
compOutput, err := svc.CompleteMultipartUpload(context.TODO(), &complete)
if err != nil {
return fmt.Errorf("Error completing upload: %w", err)
}
if compOutput != nil {
log.Infof("Successfully copied Bucket: %s Key: %s to Bucket: %s Key: %s", sourceBucket, sourceKey, destBucket, destKey)
}
return nil
}
@Mike one question. You used AbortMultipartUploadRequest
which is not present in AWS-SDK-GO-V2 so I used the AbortMultipartUpload
instead, hope it doesn't cause much difference?
Upvotes: 6
Reputation: 832
So it took some experimenting but I finally got the multipart copy working
//imports
import (
"context"
"strconv"
"github.com/aws/aws-sdk-go/service/s3"
log "github.com/sirupsen/logrus"
)
//constant for number of bits in 5 megabyte chunk
const max_part_size = 5 * 1024 * 1024
//helper function to build the string for the range of bits to copy
func buildCopySourceRange(start int64, objectSize int64) string {
end := start + max_part_size - 1
if end > objectSize {
end = objectSize - 1
}
startRange := strconv.FormatInt(start, 10)
stopRange := strconv.FormatInt(end, 10)
return "bytes=" + startRange + "-" + stopRange
}
//function that starts, perform each part upload, and completes the copy
func MultiPartCopy(sess *session.Session, sourceBucket string, sourceKey string, destBucket string, destKey string) error {
svc := s3.New(sess)
ctx, cancelFn := context.WithTimeout(context.TODO(), 10*time.Minute)
defer cancelFn()
//struct for starting a multipart upload
startInput := s3.CreateMultipartUploadInput{
Bucket: &destBucket,
Key: &destKey,
}
//send command to start copy and get the upload id as it is needed later
var uploadId string
createOutput, err := svc.CreateMultipartUploadWithContext(ctx, &startInput)
if err != nil {
return err
}
if createOutput != nil {
if createOutput.UploadId != nil {
uploadId = *createOutput.UploadId
}
}
if uploadId == "" {
return errors.New("No upload id found in start upload request")
}
var i int64
var partNumber int64 = 1
copySource := "/" + sourceBucket + "/" + sourceKey
parts := make([]*s3.CompletedPart, 0)
numUploads := fileSize / max_part_size
log.Infof("Will attempt upload in %d number of parts to %s", numUploads, destKey)
for i = 0; i < fileSize; i += max_part_size {
copyRange := buildCopySourceRange(i, fileSize)
partInput := s3.UploadPartCopyInput{
Bucket: &destBucket,
CopySource: ©Source,
CopySourceRange: ©Range,
Key: &destKey,
PartNumber: &partNumber,
UploadId: &uploadId,
}
log.Debugf("Attempting to upload part %d range: %s", partNumber, copyRange)
partResp, err := svc.UploadPartCopy(&partInput)
if err != nil {
log.Error("Attempting to abort upload")
abortIn := s3.AbortMultipartUploadInput{
UploadId: &uploadId,
}
//ignoring any errors with aborting the copy
svc.AbortMultipartUploadRequest(&abortIn)
return fmt.Errorf("Error uploading part %d : %w", partNumber, err)
}
//copy etag and part number from response as it is needed for completion
if partResp != nil {
partNum := partNumber
etag := strings.Trim(*partResp.CopyPartResult.ETag, "\"")
cPart := s3.CompletedPart{
ETag: &etag,
PartNumber: &partNum,
}
parts = append(parts, &cPart)
log.Debugf("Successfully upload part %d of %s", partNumber, uploadId)
}
partNumber++
if partNumber%50 == 0 {
log.Infof("Completed part %d of %d to %s", partNumber, numUploads, destKey)
}
}
//create struct for completing the upload
mpu := s3.CompletedMultipartUpload{
Parts: parts,
}
//complete actual upload
//does not actually copy if the complete command is not received
complete := s3.CompleteMultipartUploadInput{
Bucket: &destBucket,
Key: &destKey,
UploadId: &uploadId,
MultipartUpload: &mpu,
}
compOutput, err := svc.CompleteMultipartUpload(&complete)
if err != nil {
return fmt.Errorf("Error completing upload: %w", err)
}
if compOutput != nil {
log.Infof("Successfully copied Bucket: %s Key: %s to Bucket: %s Key: %s", sourceBucket, sourceKey, destBucket, destKey)
}
return nil
}
Upvotes: 2