Aadesh P
Aadesh P

Reputation: 245

Why is this python boto S3 multipart upload code not working?

I am trying to upload a 10 GB file to AWS S3, and someone said to use S3 Multipart Upload, so I stumbled upon someone's github gist:

import os
import sys
import glob
import subprocess
import contextlib
import functools
import multiprocessing
from multiprocessing.pool import IMapIterator
from optparse import OptionParser
from boto.s3.connection import S3Connection

#import rfc822

import boto

AWS_ACCESS_KEY_ID = 'KEY ID HERE'
AWS_SECRET_ACCESS_KEY = 'ACCESS KEY HERE'

def main(transfer_file, bucket_name, s3_key_name=None, use_rr=True,
         make_public=True, cores=None):
    if s3_key_name is None:
        s3_key_name = os.path.basename(transfer_file)

    conn = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
    bucket = conn.lookup(bucket_name)
    if bucket is None:
        bucket = conn.create_bucket(bucket_name)

    mb_size = os.path.getsize(transfer_file) / 1e6
    if mb_size < 10:
        _standard_transfer(bucket, s3_key_name, transfer_file, use_rr)
    else:
        _multipart_upload(bucket, s3_key_name, transfer_file, mb_size, use_rr,
                          cores)
    s3_key = bucket.get_key(s3_key_name)
    if make_public:
        s3_key.set_acl("public-read")

def upload_cb(complete, total):
    sys.stdout.write(".")
    sys.stdout.flush()

def _standard_transfer(bucket, s3_key_name, transfer_file, use_rr):
    print(" Upload with standard transfer, not multipart", end=' ')
    new_s3_item = bucket.new_key(s3_key_name)
    new_s3_item.set_contents_from_filename(transfer_file, reduced_redundancy=use_rr,
                                           cb=upload_cb, num_cb=10)
    print()

def map_wrap(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        return f(*args, **kwargs)
    return wrapper

def mp_from_ids(mp_id, mp_keyname, mp_bucketname):
    """Get the multipart upload from the bucket and multipart IDs.
    This allows us to reconstitute a connection to the upload
    from within multiprocessing functions.
    """
    conn = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
    bucket = conn.lookup(mp_bucketname)
    mp = boto.s3.multipart.MultiPartUpload(bucket)
    mp.key_name = mp_keyname
    mp.id = mp_id
    return mp

@map_wrap
def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part):
    """Transfer a part of a multipart upload. Designed to be run in parallel.
    """
    mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname)
    print(" Transferring", i, part)
    with open(part) as t_handle:
        mp.upload_part_from_file(t_handle, i+1)
    os.remove(part)

def _multipart_upload(bucket, s3_key_name, tarball, mb_size, use_rr=True,
                      cores=None):
    """Upload large files using Amazon's multipart upload functionality.
    """
    def split_file(in_file, mb_size, split_num=5):
        prefix = os.path.join(os.path.dirname(in_file),
                              "%sS3PART" % (os.path.basename(s3_key_name)))
        # require a split size between 5Mb (AWS minimum) and 250Mb
        split_size = int(max(min(mb_size / (split_num * 2.0), 250), 5))
        if not os.path.exists("%saa" % prefix):
            cl = ["split", "-b%sm" % split_size, in_file, prefix]
            subprocess.check_call(cl)
        return sorted(glob.glob("%s*" % prefix))

    mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr)
    print(mp.id)
    print(mp.key_name)
    with multimap(cores) as pmap:
        for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part) for (i, part) in enumerate(split_file(tarball, mb_size, cores)))):
            pass

    mp.complete_upload()

@contextlib.contextmanager
def multimap(cores=None):
    """Provide multiprocessing imap like function.
    The context manager handles setting up the pool, worked around interrupt issues
    and terminating the pool on completion.
    """
    if cores is None:
        cores = max(multiprocessing.cpu_count() - 1, 1)
    def wrapper(func):
        def wrap(self, timeout=None):
            return func(self, timeout=timeout if timeout is not None else 1e100)
        return wrap
    IMapIterator.next = wrapper(IMapIterator.next)
    pool = multiprocessing.Pool(cores)
    yield pool.imap
    pool.terminate()

if __name__ == "__main__":
    parser = OptionParser()
    parser.add_option("-r", "--norr", dest="use_rr",
                      action="store_false", default=True)
    parser.add_option("-p", "--public", dest="make_public",
                      action="store_true", default=False)
    parser.add_option("-c", "--cores", dest="cores",
                      default=multiprocessing.cpu_count())
    (options, args) = parser.parse_args()
    if len(args) < 2:
        print("No Args")
        sys.exit()
    kwargs = dict(use_rr=options.use_rr, make_public=options.make_public,
                  cores=int(options.cores))
    main(*args, **kwargs)

But its not working, and I am not sure how to fix the error: "TypeError: transfer_part() missing 4 required positional arguments: 'mp_keyname', 'mp_bucketname', 'i', and 'part'"

EDIT:

Full Error Trace As Requested:

multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "test.py", line 53, in wrapper
    return f(*args, **kwargs)
TypeError: transfer_part() missing 4 required positional arguments: 'mp_keyname', 'mp_bucketname', 'i', and 'part'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "test.py", line 132, in <module>
    main(*args, **kwargs)
  File "test.py", line 34, in main
    cores)
  File "test.py", line 96, in _multipart_upload
    for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part) for (i, part) in enumerate(split_file(tarball, mb_size, cores)))):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 689, in next
    raise value
TypeError: transfer_part() missing 4 required positional arguments: 'mp_keyname', 'mp_bucketname', 'i', and 'part'

Upvotes: 2

Views: 1705

Answers (2)

Siddarth
Siddarth

Reputation: 1020

I've had many problems with the boto library recently. Same code does'nt seem to work on a different system whereas it works perfectly on others. I've started using boto3 instead of boto. Everything seems to work fine now

Upvotes: 0

John Rotenstein
John Rotenstein

Reputation: 269191

If it fits your use case, you may want to use the AWS Command-Line Interface (CLI), which can automatically use multi-part upload for you.

aws s3 cp file.txt s3://bucket/file.txt

Upvotes: 1

Related Questions