Reputation: 245
I am trying to upload a 10 GB file to AWS S3, and someone said to use S3 Multipart Upload, so I stumbled upon someone's github gist:
import os
import sys
import glob
import subprocess
import contextlib
import functools
import multiprocessing
from multiprocessing.pool import IMapIterator
from optparse import OptionParser
from boto.s3.connection import S3Connection
#import rfc822
import boto
AWS_ACCESS_KEY_ID = 'KEY ID HERE'
AWS_SECRET_ACCESS_KEY = 'ACCESS KEY HERE'
def main(transfer_file, bucket_name, s3_key_name=None, use_rr=True,
make_public=True, cores=None):
if s3_key_name is None:
s3_key_name = os.path.basename(transfer_file)
conn = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
bucket = conn.lookup(bucket_name)
if bucket is None:
bucket = conn.create_bucket(bucket_name)
mb_size = os.path.getsize(transfer_file) / 1e6
if mb_size < 10:
_standard_transfer(bucket, s3_key_name, transfer_file, use_rr)
else:
_multipart_upload(bucket, s3_key_name, transfer_file, mb_size, use_rr,
cores)
s3_key = bucket.get_key(s3_key_name)
if make_public:
s3_key.set_acl("public-read")
def upload_cb(complete, total):
sys.stdout.write(".")
sys.stdout.flush()
def _standard_transfer(bucket, s3_key_name, transfer_file, use_rr):
print(" Upload with standard transfer, not multipart", end=' ')
new_s3_item = bucket.new_key(s3_key_name)
new_s3_item.set_contents_from_filename(transfer_file, reduced_redundancy=use_rr,
cb=upload_cb, num_cb=10)
print()
def map_wrap(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
def mp_from_ids(mp_id, mp_keyname, mp_bucketname):
"""Get the multipart upload from the bucket and multipart IDs.
This allows us to reconstitute a connection to the upload
from within multiprocessing functions.
"""
conn = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
bucket = conn.lookup(mp_bucketname)
mp = boto.s3.multipart.MultiPartUpload(bucket)
mp.key_name = mp_keyname
mp.id = mp_id
return mp
@map_wrap
def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part):
"""Transfer a part of a multipart upload. Designed to be run in parallel.
"""
mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname)
print(" Transferring", i, part)
with open(part) as t_handle:
mp.upload_part_from_file(t_handle, i+1)
os.remove(part)
def _multipart_upload(bucket, s3_key_name, tarball, mb_size, use_rr=True,
cores=None):
"""Upload large files using Amazon's multipart upload functionality.
"""
def split_file(in_file, mb_size, split_num=5):
prefix = os.path.join(os.path.dirname(in_file),
"%sS3PART" % (os.path.basename(s3_key_name)))
# require a split size between 5Mb (AWS minimum) and 250Mb
split_size = int(max(min(mb_size / (split_num * 2.0), 250), 5))
if not os.path.exists("%saa" % prefix):
cl = ["split", "-b%sm" % split_size, in_file, prefix]
subprocess.check_call(cl)
return sorted(glob.glob("%s*" % prefix))
mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr)
print(mp.id)
print(mp.key_name)
with multimap(cores) as pmap:
for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part) for (i, part) in enumerate(split_file(tarball, mb_size, cores)))):
pass
mp.complete_upload()
@contextlib.contextmanager
def multimap(cores=None):
"""Provide multiprocessing imap like function.
The context manager handles setting up the pool, worked around interrupt issues
and terminating the pool on completion.
"""
if cores is None:
cores = max(multiprocessing.cpu_count() - 1, 1)
def wrapper(func):
def wrap(self, timeout=None):
return func(self, timeout=timeout if timeout is not None else 1e100)
return wrap
IMapIterator.next = wrapper(IMapIterator.next)
pool = multiprocessing.Pool(cores)
yield pool.imap
pool.terminate()
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-r", "--norr", dest="use_rr",
action="store_false", default=True)
parser.add_option("-p", "--public", dest="make_public",
action="store_true", default=False)
parser.add_option("-c", "--cores", dest="cores",
default=multiprocessing.cpu_count())
(options, args) = parser.parse_args()
if len(args) < 2:
print("No Args")
sys.exit()
kwargs = dict(use_rr=options.use_rr, make_public=options.make_public,
cores=int(options.cores))
main(*args, **kwargs)
But its not working, and I am not sure how to fix the error: "TypeError: transfer_part() missing 4 required positional arguments: 'mp_keyname', 'mp_bucketname', 'i', and 'part'"
EDIT:
Full Error Trace As Requested:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "test.py", line 53, in wrapper
return f(*args, **kwargs)
TypeError: transfer_part() missing 4 required positional arguments: 'mp_keyname', 'mp_bucketname', 'i', and 'part'
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "test.py", line 132, in <module>
main(*args, **kwargs)
File "test.py", line 34, in main
cores)
File "test.py", line 96, in _multipart_upload
for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part) for (i, part) in enumerate(split_file(tarball, mb_size, cores)))):
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 689, in next
raise value
TypeError: transfer_part() missing 4 required positional arguments: 'mp_keyname', 'mp_bucketname', 'i', and 'part'
Upvotes: 2
Views: 1705
Reputation: 1020
I've had many problems with the boto library recently. Same code does'nt seem to work on a different system whereas it works perfectly on others. I've started using boto3 instead of boto. Everything seems to work fine now
Upvotes: 0
Reputation: 269191
If it fits your use case, you may want to use the AWS Command-Line Interface (CLI), which can automatically use multi-part upload for you.
aws s3 cp file.txt s3://bucket/file.txt
Upvotes: 1