Taslim Arif
Taslim Arif

Reputation: 7

How to create large annotation using Dynamics 365 web api InitializeAnnotationBlocksUpload Action using Dynamics WEB API

How to create large annotation using Dynamics 365 web api InitializeAnnotationBlocksUpload Action using Dynamics WEB API.

I need to upload large file of size 130 MB in 25MB of chunks to avoid memory flow so that I can process multiple files at a time.

I have written below code which doesn't seem to work and fails at initialize_upload method.

import requests
import os


def initialize_upload(resource_url, access_token, entity_name, record_guid, file_attribute_name, file_name, file_size):
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Accept': 'application/json',
        'Content-Type': 'application/json'
    }

    # Step 1: Initialize the upload
    initialize_url = f'{resource_url}/api/data/v9.2/InitializeAnnotationBlocksUpload'
    initialize_payload = {
        "Target": {
            "@odata.type": f"Microsoft.Dynamics.CRM.{entity_name}",
            "annotationid": record_guid,
            "[email protected]": "/accounts(4c9ae3f7-744b-ef11-accd-6045bd71fd81)"
        },
        "FileAttributeName": file_attribute_name,
        "FileName": file_name,
        "FileSizeInBytes": file_size
    }

    initialize_response = requests.post(initialize_url, headers=headers, json=initialize_payload)
    initialize_response.raise_for_status()
    return initialize_response.json()


def upload_chunk(resource_url, access_token, file_continuation_token, block_id, chunk):
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Accept': 'application/json',
        'Content-Type': 'application/json'
    }

    upload_url = f'{resource_url}/api/data/v9.1/UploadBlock'
    upload_payload = {
        "FileContinuationToken": file_continuation_token,
        "BlockId": block_id,
        "BlockData": chunk
    }

    upload_response = requests.post(upload_url, headers=headers, json=upload_payload)
    upload_response.raise_for_status()


def complete_upload(resource_url, access_token, file_continuation_token):
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Accept': 'application/json',
        'Content-Type': 'application/json'
    }

    complete_url = f'{resource_url}/api/data/v9.1/CompleteAnnotationBlocksUpload'
    complete_payload = {
        "FileContinuationToken": file_continuation_token
    }

    complete_response = requests.post(complete_url, headers=headers, json=complete_payload)
    complete_response.raise_for_status()


def upload_large_file(resource_url, access_token, entity_name, record_guid, file_attribute_name, file_path):
    file_name = os.path.basename(file_path)
    file_size = os.path.getsize(file_path)
    chunk_size = 25 * 1024 * 1024  # 25 MB

    # Step 1: Initialize the upload
    initialize_data = initialize_upload(resource_url, access_token, entity_name, record_guid, file_attribute_name,
                                        file_name, file_size)
    file_continuation_token = initialize_data['FileContinuationToken']

    # Step 2: Upload the file in chunks
    with open(file_path, 'rb') as file:
        block_id = 0
        while True:
            chunk = file.read(chunk_size)
            if not chunk:
                break
            upload_chunk(resource_url, access_token, file_continuation_token, block_id, chunk)
            block_id += 1
            print(f'Uploaded chunk {block_id}')

    # Step 3: Complete the upload
    complete_upload(resource_url, access_token, file_continuation_token)
    print(f'File upload complete: {file_name}')


# Example usage
resource_url = "DYNAMICS_RESOURCE_URL"
access_token = "ACCESS_TOKEN"
entity_name = 'annotation'
record_guid = 'RECORD_GUID'  # This should be a GUID string
file_attribute_name = 'documentbody'
file_path = 'ddd.pdf'

upload_large_file(resource_url, access_token, entity_name, record_guid, file_attribute_name, file_path)

Upvotes: 0

Views: 22

Answers (0)

Related Questions