Reputation: 1
Trying to fetch some documents from azure and run document intelligence on it. Original script was taking 14 hours to run. Tried to optimize it getting errors.
Error: RuntimeWarning: coroutine 'process_all_documents' was never awaited chunk_documents.process_all_documents() RuntimeWarning: Enable tracemalloc to get the object allocation traceback
import asyncio
import aiohttp
import csv
import config
import extract_metadata
from azure.storage.blob.aio import BlobServiceClient
from azure.core.credentials import AzureKeyCredential
from azure.ai.formrecognizer.aio import DocumentAnalysisClient
from azure.core.exceptions import HttpResponseError
async def list_pdfs(container_name, max_files):
async with BlobServiceClient.from_connection_string(config.AZURE_STORAGE_CONNECTION_STRING) as blob_service_client:
container_client = blob_service_client.get_container_client(container_name)
pdf_files = []
async for blob in container_client.list_blobs():
if blob.name.endswith(".pdf"):
pdf_files.append(blob.name)
if max_files and len(pdf_files) >= max_files:
break
return pdf_files
async def analyze_document_from_blob(blob_url, client):
try:
poller = await client.begin_analyze_document_from_url(model_id="prebuilt-layout", document_url=blob_url)
result = await poller.result()
full_text = "\n".join([line.content for page in result.pages for line in page.lines])
return full_text
except HttpResponseError as e:
print(f"Error analyzing document from URL {blob_url}: {e.message}")
return None
def chunk_text(text, chunk_size=6000):
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
async def upload_to_blob(local_file_path, container_name, blob_name):
async with BlobServiceClient.from_connection_string(config.AZURE_STORAGE_CONNECTION_STRING) as blob_service_client:
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
try:
async with aiohttp.ClientSession() as session:
async with session.put(blob_client.url, data=open(local_file_path, "rb")) as response:
if response.status == 201:
print(f"Uploaded {local_file_path} to {container_name}/{blob_name}")
else:
print(f"Failed to upload {local_file_path}. Status code: {response.status}")
except Exception as e:
print(f"Error uploading {local_file_path}: {e}")
async def process_document(filename, metadata_dict, document_client):
metadata = metadata_dict.get(filename)
if metadata is None:
print(f"Skipping {filename} (no metadata found).")
return None
async with BlobServiceClient.from_connection_string(config.AZURE_STORAGE_CONNECTION_STRING) as blob_service_client:
blob_client = blob_service_client.get_blob_client(container="content", blob=filename)
extracted_text = await analyze_document_from_blob(blob_client.url, document_client)
if not extracted_text:
print(f"Skipping {filename} (analysis failed).")
return None
chunks = chunk_text(extracted_text)
metadata_keys = ["idx", "PartitionKey", "RowKey", "timestamp", "title", "summary",
"sourcefile", "documenttypes", "organizations", "agencies", "programs", "keywords"]
# Define a fixed list of metadata keys
rows = [[filename, i, chunk] + [metadata.get(key, "") for key in metadata_keys] for i, chunk in enumerate(chunks)]
print(f"{filename} processed with {len(chunks)} chunks.")
return rows
async def process_all_documents():
metadata_dict = extract_metadata.load_metadata_from_blob()
pdf_files = await list_pdfs("content", None)
endpoint = config.AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT
api_key = config.AZURE_DOCUMENT_INTELLIGENCE_KEY
async with DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(api_key)) as document_client:
semaphore = asyncio.Semaphore(10) # Limit to 10 concurrent tasks
tasks = [process_document(filename, metadata_dict, document_client) for filename in pdf_files]
results = await asyncio.gather(*tasks)
with open("document_chunks.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["filename", "chunk_id", "text"] + list(metadata_dict.keys()))
for result in results:
if result:
writer.writerows(result)
await upload_to_blob("document_chunks.csv", "metadata", "metadata_iris/document_chunks.csv")
print("Processing completed.")
async def main():
await process_all_documents()
if __name__ == "__main__":
asyncio.run(main())
Upvotes: 0
Views: 53
Reputation: 1371
I have created azure storage account and document intelligence. I gave contributor role assignment to storage account.
Python code:
import os
import logging
import asyncio
import aiohttp
import csv
from azure.storage.blob.aio import BlobServiceClient
from azure.core.credentials import AzureKeyCredential
from azure.ai.formrecognizer.aio import DocumentAnalysisClient
from azure.core.exceptions import HttpResponseError
# Configuration - Replace these placeholders with your actual values
AZURE_STORAGE_CONNECTION_STRING = "Your-storage conn-string"
AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT = "Your-document-intelligence endpoint"
AZURE_DOCUMENT_INTELLIGENCE_KEY = "Your doc-intelligence endpoint key"
BLOB_CONTAINER_NAME = "sample" # Name of the Blob Container where your PDFs are stored
OUTPUT_CONTAINER_NAME = "test2" # Name of the Blob Container where the output will be uploaded
# Fixed metadata keys list that will be appended to each CSV row
METADATA_KEYS = ["idx", "PartitionKey", "RowKey", "timestamp", "title", "summary",
"sourcefile", "documenttypes", "organizations", "agencies", "programs", "keywords"]
# Function to list PDFs in the specified Azure Blob container
async def list_pdfs(container_name, max_files):
async with BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING) as blob_service_client:
container_client = blob_service_client.get_container_client(container_name)
pdf_files = []
async for blob in container_client.list_blobs():
if blob.name.endswith(".pdf"):
pdf_files.append(blob.name)
if max_files and len(pdf_files) >= max_files:
break
return pdf_files
# Function to analyze a document from a blob using Form Recognizer
async def analyze_document_from_blob(blob_url, client):
try:
poller = await client.begin_analyze_document_from_url(model_id="prebuilt-layout", document_url=blob_url)
result = await poller.result()
# Extract the full text from the document
full_text = "\n".join([line.content for page in result.pages for line in page.lines])
return full_text
except HttpResponseError as e:
print(f"Error analyzing document from URL {blob_url}: {e.message}")
return None
# Function to split the extracted text into smaller chunks
def chunk_text(text, chunk_size=6000):
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
# Function to upload processed file to Azure Blob storage
async def upload_to_blob(local_file_path, container_name, blob_name):
async with BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING) as blob_service_client:
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
try:
with open(local_file_path, "rb") as file:
# Upload and overwrite if the file already exists
await blob_client.upload_blob(file, overwrite=True)
print(f"Uploaded {local_file_path} to {container_name}/{blob_name}")
except Exception as e:
print(f"Error uploading {local_file_path}: {e}")
# Function to process a single document
async def process_document(filename, metadata_dict, document_client):
# Retrieve metadata; if none exists, use defaults with empty strings for each key
metadata = metadata_dict.get(filename, {key: "" for key in METADATA_KEYS})
if filename not in metadata_dict:
print(f"Warning: No metadata found for {filename}. Using default empty values.")
async with BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING) as blob_service_client:
blob_client = blob_service_client.get_blob_client(container=BLOB_CONTAINER_NAME, blob=filename)
extracted_text = await analyze_document_from_blob(blob_client.url, document_client)
if not extracted_text:
print(f"Skipping {filename} (analysis failed).")
return None
chunks = chunk_text(extracted_text)
# Build rows for CSV output using METADATA_KEYS
rows = [
[filename, i, chunk] + [metadata.get(key, "") for key in METADATA_KEYS]
for i, chunk in enumerate(chunks)
]
print(f"{filename} processed with {len(chunks)} chunks.")
return rows
# Function to process all documents in the container
async def process_all_documents():
# Dummy metadata (modify this as needed to include all expected files)
metadata_dict = {
"document1.pdf": {"title": "Title 1", "author": "Author 1"},
"document2.pdf": {"title": "Title 2", "author": "Author 2"},
"doc-intelligence.pdf": {"title": "Doc Intelligence", "author": "Your Name"}
}
# Get the list of PDFs from the container
pdf_files = await list_pdfs(BLOB_CONTAINER_NAME, None)
# Initialize Document Analysis Client
endpoint = AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT
api_key = AZURE_DOCUMENT_INTELLIGENCE_KEY
async with DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(api_key)) as document_client:
tasks = [process_document(filename, metadata_dict, document_client) for filename in pdf_files]
results = await asyncio.gather(*tasks)
# Write results to a CSV file with a header that matches our metadata fields
with open("document_chunks.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["filename", "chunk_id", "text"] + METADATA_KEYS)
for result in results:
if result:
writer.writerows(result)
# Upload the processed CSV to Azure Blob storage
await upload_to_blob("document_chunks.csv", OUTPUT_CONTAINER_NAME, "document_chunks.csv")
print("Processing completed.")
# Main function to run the document processing
async def main():
await process_all_documents()
# Run the script
if __name__ == "__main__":
asyncio.run(main())
After ran the above code getting the below response in console.
(.venv) C:\User
neDrive - Microsoft\Documents\stack\uploadbolb\test>python test1.py
Skipping doc-intelligence.pdf (no metadata found).
Uploaded document_chunks.csv to test2/document_chunks.csv Processing completed.
(.venv) C:\Users
OneDrive - Microsoft\Documents\stack\uploadbolb\test>python test2.py
doc-intelligence.pdf processed with 1 chunks.
Uploaded document_chunks.csv to test2/document_chunks.csv Processing completed.
Below is the chunk file which created in destination container.
Output:
Upvotes: -1