David Hurley
David Hurley

Reputation: 45

Azure durable function activity trigger with python class

I have a function app with a http trigger function which takes an orchestrator function name. I have 2 orchestrator functions for differing workflows. Both of the orchestrator functions need to read a list of files from blob and upload data to blob following activity function execution. Following best practices I should have the Blob I/O functions (list,upload) as activity functions and call from orchestrator (deterministic, no I/O in orchestrator). I have looked at the example from Azure for fan out/in with durable functions (https://github.com/Azure/azure-functions-durable-python/blob/dev/samples-v2/fan_in_fan_out/function_app.py). However, in this example the BlobConnection() is called in the BlobUpload() function. But ideally I want a class that accepts a blob container name and creates a connection and functions such as List, Upload, Download can inherit this connection - so I am not creating the connection in each activity function. Can I somehow have a class as a function? Or do I need to recreate the connection string in each Blob I/O activity function? Thanks

Upvotes: 0

Views: 1174

Answers (1)

Vivek Vaibhav Shandilya
Vivek Vaibhav Shandilya

Reputation: 2616

You can use helper function in your durable function. I have created a get_blob_service_client helper function for connection.

def get_blob_service_client():
    connect_str = os.getenv('AzureWebJobsStorage')
    return BlobServiceClient.from_connection_string(connect_str)

I have made some changes in function which you provided.

function_app.py

    import os
    import pathlib
    import logging
    
    from azure.storage.blob import BlobServiceClient
    from azure.core.exceptions import ResourceExistsError
    
    import azure.functions as func
    import azure.durable_functions as df
    
    myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
    
    def get_blob_service_client():
        
        connect_str = os.getenv('AzureWebJobsStorage')
        return BlobServiceClient.from_connection_string(connect_str)
    
    
    @myApp.route(route="orchestrators/{functionName}")
    @myApp.durable_client_input(client_name="client")
    async def HttpStart(req: func.HttpRequest, client):
        
        instance_id = await client.start_new(req.route_params["functionName"])
    
        logging.info(f"Started orchestration with ID = '{instance_id}'.")
    
        return client.create_check_status_response(req, instance_id)
    
    @myApp.orchestration_trigger(context_name="context")
    def E2_BackupSiteContent(context: df.DurableOrchestrationContext):
        root_directory: str = r"C:\Users\< your username>\Desktop\test"
    
        if not root_directory:
            raise Exception("A directory path is required as input")
    
        files = yield context.call_activity("E2_GetFileList", root_directory)
        tasks = []
        for file in files:
            tasks.append(context.call_activity("E2_CopyFileToBlob", file))
        
        results = yield context.task_all(tasks)
        total_bytes = sum(results)
        return total_bytes
    
    
    
    @myApp.activity_trigger(input_name="rootDirectory")
    def E2_GetFileList(rootDirectory):
        all_file_paths = []
        
        for path, _, files in os.walk(rootDirectory):
                for name in files:
                        file_path = os.path.join(path, name)
                        all_file_paths.append(file_path)
        
        return all_file_paths
    
    @myApp.activity_trigger(input_name="filePath")
    def E2_CopyFileToBlob(filePath)
        blob_service_client = get_blob_service_client()
        container_name = "test"
       
        try:
            blob_service_client.create_container(container_name)
        except ResourceExistsError:
            pass
        parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
        blob_name = parent_dir + "_" + fname
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
        
        blob_client.upload_blob(filePath)
        byte_count = os.path.getsize(filePath)
 
        return byte_count

it ran perfectly in local and uploaded the files in container "test": enter image description here

enter image description here

enter image description here enter image description here

Files in "root_directory":

enter image description here

as you can see files have been uploaded in the storage account.

enter image description here

enter image description here

Upvotes: 1

Related Questions