Vexbeex
Vexbeex

Reputation: 1

RuntimeWarning: coroutine 'process_all_documents' was never awaited chunk_documents.process_all_documents()

Trying to fetch some documents from azure and run document intelligence on it. Original script was taking 14 hours to run. Tried to optimize it getting errors.

Error: RuntimeWarning: coroutine 'process_all_documents' was never awaited chunk_documents.process_all_documents() RuntimeWarning: Enable tracemalloc to get the object allocation traceback

import asyncio
import aiohttp
import csv
import config
import extract_metadata
from azure.storage.blob.aio import BlobServiceClient
from azure.core.credentials import AzureKeyCredential
from azure.ai.formrecognizer.aio import DocumentAnalysisClient
from azure.core.exceptions import HttpResponseError

async def list_pdfs(container_name, max_files):
    async with BlobServiceClient.from_connection_string(config.AZURE_STORAGE_CONNECTION_STRING) as blob_service_client:
        container_client = blob_service_client.get_container_client(container_name)
        pdf_files = []

        async for blob in container_client.list_blobs():
            if blob.name.endswith(".pdf"):
                pdf_files.append(blob.name)
            if max_files and len(pdf_files) >= max_files:
                break

        return pdf_files


async def analyze_document_from_blob(blob_url, client):
    try:
        poller = await client.begin_analyze_document_from_url(model_id="prebuilt-layout", document_url=blob_url)
        result = await poller.result()

        full_text = "\n".join([line.content for page in result.pages for line in page.lines])
        return full_text

    except HttpResponseError as e:
        print(f"Error analyzing document from URL {blob_url}: {e.message}")
        return None


def chunk_text(text, chunk_size=6000):
    return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]


async def upload_to_blob(local_file_path, container_name, blob_name):
    async with BlobServiceClient.from_connection_string(config.AZURE_STORAGE_CONNECTION_STRING) as blob_service_client:
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

        try:
            async with aiohttp.ClientSession() as session:
                async with session.put(blob_client.url, data=open(local_file_path, "rb")) as response:
                    if response.status == 201:
                        print(f"Uploaded {local_file_path} to {container_name}/{blob_name}")
                    else:
                        print(f"Failed to upload {local_file_path}. Status code: {response.status}")
        except Exception as e:
            print(f"Error uploading {local_file_path}: {e}")


async def process_document(filename, metadata_dict, document_client):
    metadata = metadata_dict.get(filename)
    if metadata is None:
        print(f"Skipping {filename} (no metadata found).")
        return None

    async with BlobServiceClient.from_connection_string(config.AZURE_STORAGE_CONNECTION_STRING) as blob_service_client:
        blob_client = blob_service_client.get_blob_client(container="content", blob=filename)
        extracted_text = await analyze_document_from_blob(blob_client.url, document_client)

    if not extracted_text:
        print(f"Skipping {filename} (analysis failed).")
        return None

    chunks = chunk_text(extracted_text)
    metadata_keys = ["idx", "PartitionKey", "RowKey", "timestamp", "title", "summary", 
                         "sourcefile", "documenttypes", "organizations", "agencies", "programs", "keywords"]
          # Define a fixed list of metadata keys
    rows = [[filename, i, chunk] + [metadata.get(key, "") for key in metadata_keys] for i, chunk in enumerate(chunks)]
    print(f"{filename} processed with {len(chunks)} chunks.")
    return rows


async def process_all_documents():
    metadata_dict = extract_metadata.load_metadata_from_blob()
    pdf_files = await list_pdfs("content", None)

    endpoint = config.AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT
    api_key = config.AZURE_DOCUMENT_INTELLIGENCE_KEY

    async with DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(api_key)) as document_client:
        semaphore = asyncio.Semaphore(10)  # Limit to 10 concurrent tasks
        tasks = [process_document(filename, metadata_dict, document_client) for filename in pdf_files]
        results = await asyncio.gather(*tasks)

    with open("document_chunks.csv", "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["filename", "chunk_id", "text"] + list(metadata_dict.keys()))

        for result in results:
            if result:
                writer.writerows(result)

    await upload_to_blob("document_chunks.csv", "metadata", "metadata_iris/document_chunks.csv")

    print("Processing completed.")


async def main():
    await process_all_documents()

if __name__ == "__main__":
    asyncio.run(main())

Upvotes: 0

Views: 53

Answers (1)

Pavan
Pavan

Reputation: 1371

I have created azure storage account and document intelligence. I gave contributor role assignment to storage account.

enter image description here

  • I have created one source container and destination container. I was uploaded one PDF file in source container.
  • The below code executed successfully and downloaded a output file and sent to destination container as chunk file.

Python code:

import os
import logging
import asyncio
import aiohttp
import csv
from azure.storage.blob.aio import BlobServiceClient
from azure.core.credentials import AzureKeyCredential
from azure.ai.formrecognizer.aio import DocumentAnalysisClient
from azure.core.exceptions import HttpResponseError
# Configuration - Replace these placeholders with your actual values
AZURE_STORAGE_CONNECTION_STRING = "Your-storage conn-string"
AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT = "Your-document-intelligence endpoint"
AZURE_DOCUMENT_INTELLIGENCE_KEY = "Your doc-intelligence endpoint key"
BLOB_CONTAINER_NAME = "sample"  # Name of the Blob Container where your PDFs are stored
OUTPUT_CONTAINER_NAME = "test2"  # Name of the Blob Container where the output will be uploaded
# Fixed metadata keys list that will be appended to each CSV row
METADATA_KEYS = ["idx", "PartitionKey", "RowKey", "timestamp", "title", "summary",
                "sourcefile", "documenttypes", "organizations", "agencies", "programs", "keywords"]
# Function to list PDFs in the specified Azure Blob container
async def list_pdfs(container_name, max_files):
    async with BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING) as blob_service_client:
        container_client = blob_service_client.get_container_client(container_name)
        pdf_files = []
        async for blob in container_client.list_blobs():
            if blob.name.endswith(".pdf"):
                pdf_files.append(blob.name)
            if max_files and len(pdf_files) >= max_files:
                break
        return pdf_files
# Function to analyze a document from a blob using Form Recognizer
async def analyze_document_from_blob(blob_url, client):
    try:
        poller = await client.begin_analyze_document_from_url(model_id="prebuilt-layout", document_url=blob_url)
        result = await poller.result()
        # Extract the full text from the document
        full_text = "\n".join([line.content for page in result.pages for line in page.lines])
        return full_text
    except HttpResponseError as e:
        print(f"Error analyzing document from URL {blob_url}: {e.message}")
        return None
# Function to split the extracted text into smaller chunks
def chunk_text(text, chunk_size=6000):
    return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
# Function to upload processed file to Azure Blob storage
async def upload_to_blob(local_file_path, container_name, blob_name):
    async with BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING) as blob_service_client:
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
        try:
            with open(local_file_path, "rb") as file:
                # Upload and overwrite if the file already exists
                await blob_client.upload_blob(file, overwrite=True)
            print(f"Uploaded {local_file_path} to {container_name}/{blob_name}")
        except Exception as e:
            print(f"Error uploading {local_file_path}: {e}")
# Function to process a single document
async def process_document(filename, metadata_dict, document_client):
    # Retrieve metadata; if none exists, use defaults with empty strings for each key
    metadata = metadata_dict.get(filename, {key: "" for key in METADATA_KEYS})
    if filename not in metadata_dict:
        print(f"Warning: No metadata found for {filename}. Using default empty values.")
    async with BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING) as blob_service_client:
        blob_client = blob_service_client.get_blob_client(container=BLOB_CONTAINER_NAME, blob=filename)
        extracted_text = await analyze_document_from_blob(blob_client.url, document_client)
    if not extracted_text:
        print(f"Skipping {filename} (analysis failed).")
        return None
    chunks = chunk_text(extracted_text)
    # Build rows for CSV output using METADATA_KEYS
    rows = [
        [filename, i, chunk] + [metadata.get(key, "") for key in METADATA_KEYS]
        for i, chunk in enumerate(chunks)
    ]
    print(f"{filename} processed with {len(chunks)} chunks.")
    return rows
# Function to process all documents in the container
async def process_all_documents():
    # Dummy metadata (modify this as needed to include all expected files)
    metadata_dict = {
        "document1.pdf": {"title": "Title 1", "author": "Author 1"},
        "document2.pdf": {"title": "Title 2", "author": "Author 2"},
        "doc-intelligence.pdf": {"title": "Doc Intelligence", "author": "Your Name"}
    }
    # Get the list of PDFs from the container
    pdf_files = await list_pdfs(BLOB_CONTAINER_NAME, None)
    # Initialize Document Analysis Client
    endpoint = AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT
    api_key = AZURE_DOCUMENT_INTELLIGENCE_KEY
    async with DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(api_key)) as document_client:
        tasks = [process_document(filename, metadata_dict, document_client) for filename in pdf_files]
        results = await asyncio.gather(*tasks)
    # Write results to a CSV file with a header that matches our metadata fields
    with open("document_chunks.csv", "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(["filename", "chunk_id", "text"] + METADATA_KEYS)
        for result in results:
            if result:
                writer.writerows(result)
    # Upload the processed CSV to Azure Blob storage
    await upload_to_blob("document_chunks.csv", OUTPUT_CONTAINER_NAME, "document_chunks.csv")
    print("Processing completed.")
# Main function to run the document processing
async def main():
    await process_all_documents()
# Run the script
if __name__ == "__main__":
    asyncio.run(main())

After ran the above code getting the below response in console.

(.venv) C:\User
neDrive - Microsoft\Documents\stack\uploadbolb\test>python test1.py
Skipping doc-intelligence.pdf (no metadata found).
Uploaded document_chunks.csv to test2/document_chunks.csv Processing completed.
(.venv) C:\Users
OneDrive - Microsoft\Documents\stack\uploadbolb\test>python test2.py
doc-intelligence.pdf processed with 1 chunks.
Uploaded document_chunks.csv to test2/document_chunks.csv Processing completed.

Below is the chunk file which created in destination container.

Output:

enter image description here

Upvotes: -1

Related Questions