Andy
Andy

Reputation: 10988

How to stream Azure Blob to AWS S3?

I need to copy a large Azure Blob to AWS S3 without keeping a copy of it in the memory. After some Googling I have found a bunch of examples that I combined in the following script. This still loads data into memory though. Is there a good way to avoid it?

import boto3
from azure.storage.blob import BlobClient

with io.BytesIO() as input_stream, io.BytesIO() as output_stream:
    blob_client = BlobClient.from_connection_string(
        conn_str=AZURE_CONNECTION_STRING,
        container_name=container,
        blob_name=filename,
    )
    blob_client.download_blob().readinto(input_stream)

    input_stream.seek(0)
    shutil.copyfileobj(input_stream, output_stream)
    output_stream.seek(0)

    boto3.resource("s3").Object(BUCKET_NAME, s3_key).put(Body=output_stream)

Upvotes: 5

Views: 3279

Answers (3)

Akash Sharma
Akash Sharma

Reputation: 136

There is one very simple way to do this

import temp

blob_client = ...
s3_client = ...

with tempfile.NamedTemporaryFile() as temp_file:
    for chunk in blob_client.download_blob().chunks():
        # download in chunk and flush it into hard disk from memory
        temp_file.write(chunk)
        temp_file.flush()

    # the below upload automatically handles the multi-part uploading
    s3_client.upload(temp_file.name, s3_bucket, s3_key) 

Upvotes: 0

zborna
zborna

Reputation: 1

Based on the samu answer here is a working example, there was a missing part related to complete the multipart upload

def copy_from_azure_to_s3(conn_str:str,container_name:str,file_name:str,bucket_name:str,s3):

    #initiate Azure client
    blob_client = BlobClient.from_connection_string(
        conn_str=con_string,
        container_name=container_name,
        blob_name=file_name,
        max_chunk_get_size=50*1024*1024 #min size for multipart upload is 5MB, it needs to be higher than that
    )

    #define multipart upload
    mpu = s3.create_multipart_upload(Bucket=bucket_name, Key=file_name)
    mpu_id = mpu["UploadId"]

    blob = blob_client.download_blob()

    #store info about individual parts
    etags=[]

    #stream it to s3
    for part_num, chunk in enumerate(blob.chunks(), start=1):
        response= s3.upload_part(
            Body=chunk,
            Bucket=bucket_name,
            Key=file_name,
            UploadId=mpu_id,
            PartNumber=part_num,
        )
        etags.append({'ETag': response['ETag'],'PartNumber':part_num})

    #finish the upload
    s3.complete_multipart_upload(
        Bucket=bucket_name,
        Key=file_name,
        UploadId=mpu_id,
        MultipartUpload={
            'Parts': etags
        },

    )

Upvotes: 0

samu
samu

Reputation: 3120

The copy of the blob is in the memory, because you seem to be reading it in one go. You're initializing two instances of io.BytesIO, but then you're reading the entire blob using blob_client.download_blob().readinto(input_stream).

What I think you should try instead is reading (and putting) chunks of the blob, one chunk at a time, avoiding reading the entirety of it to memory.

On the upload side (s3), you can approach the issue in two ways. You can either:

  • Use S3 partial (multipart) upload mechanism (using .upload() to initiate, and then .upload_part() to upload each part (chunk), or
  • Provide a file-like object to .upload_fileobj() that would be responsible for providing a chunk at a time

As far as I can tell, seems like blob_client.download_blob() already returns a file-like object called StorageStreamDownloader, that implements a chunks() method. I can't find proper documentation for it, but according to the source code, seems like it's returning an iterator that you can use.

Therefore, consider something like this (I don't have access to any azure/s3 service at this very moment, so this code might not work out of the box):

import boto3
from boto3.s3.transfer import TransferConfig, S3Transfer

blob_client = BlobClient.from_connection_string(
    conn_str=AZURE_CONNECTION_STRING,
    container_name=container,
    blob_name=filename,
)
s3 = boto3.resource('s3')

mpu = s3.create_multipart_upload(Bucket=BUCKET_NAME, Key=s3_key)
mpu_id = mpu["UploadId"]

blob = blob_client.download_blob()
for part_num, chunk in enumerate(blob.chunks()):
    s3.upload_part(
        Body=chunk,
        Bucket=BUCKET_NAME,
        Key=s3_key,
        UploadId=mpu_id,
        PartNumber=part_num,
    )

Like I mentioned - I have no access to any blob storage/s3 resource right now, so I eyeballed the code. But the general idea should be the same. By using .chunks() of the blob, you should only fetch a small chunk of the data into the memory, upload it (using MPU) to S3 and discard immediatelly.

Upvotes: 3

Related Questions