Frederik Hansen
Frederik Hansen

Reputation: 506

Wait for Data Factory pipeline to finish in Durable Function

I have a number of Azure Data Factory pipelines that I want to execute. The pipeline hierarchy is essentially a DAG, so I'm able to execute them in order using a topological sort.

I was thinking about doing this with Durable Functions as they seem to fit the use case. However, I'm at a loss for how I go about letting the orchestrator know, that a specific pipeline has finished. I would prefer not to make modifications to the pipelines, like calling a accept/reject function at the end.

I know I can do something like the below code, but does that go against best practices regarding Durable Functions?

var pipelineRunId = await context.CallActivityAsync<string>("StartPipeline", pipelineId);
var hasFinished = false;
while(!hasFinished)
{
    var fireAt = context.CurrentUtcDateTime.AddSeconds(30);
    await context.CreateTimer(fireAt, CancellationToken.None);
    hasFinished = await context.CallActivityAsync<bool>("CheckPipelineStatus", pipelineRunId);
}

Upvotes: 1

Views: 1832

Answers (2)

robs
robs

Reputation: 940

You can both start a Data Factory pipeline and monitor for completion within a durable function.

To use the Azure credentials passed to the data factory client you will need to register your application in Azure Active Directory, generate a secret for it and then add your application registration to the Data Factory Contributor role, which you do under your Azure Subscriptions Access Control (IAM) blade.

https://learn.microsoft.com/en-us/powerapps/developer/data-platform/walkthrough-register-app-azure-active-directory

To start a pipeline use the IPipelinesOperations interface. I have used the CreateRunWithHttpMessagesAsync method with a lot of success.

https://learn.microsoft.com/en-us/dotnet/api/microsoft.azure.management.datafactory.ipipelinesoperations.createrunwithhttpmessagesasync?view=azure-dotnet

Use DataFactoryManagementClient to get the pipelines like this:

//Set up credentials for app registration
ClientCredential cc = new ClientCredential(<Your-App-ClientId>, <Your-App-Secret>);

AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
            
ServiceClientCredentials credentials = new TokenCredentials(result.AccessToken);

//Create data factory client and start the pipeline
var dataFactoryMgmtClient = new DataFactoryManagementClient(credentials)
{
    SubscriptionId = "<your-Azure-SubscriptionId>"
};

Dictionary<string, object> parameters = new Dictionary<string, object>()

 AzureOperationResponse<CreateRunResponse> runResponse = await 
dataFactoryMgmtClient.Pipelines.CreateRunWithHttpMessagesAsync("<azure-resource-group>", "<factory-name>", "<pipeline-name>", parameters: parameters);

Use the parameters dictionary to pass any Data Factory parameters your data factory pipeline expects.

The AzureOperationResponse<CreateRunResponse> runResponse contains a run id which you'll find in runResponse.Body.RunId. You need this to monitor the execution of the pipeline.

Start the pipeline in an ActivityTrigger function. I suggest using CallActivityWithRetryAsync to call it with a retry.

To monitor the execution you will need to set up another orchestration. Something like this:

[FunctionName("AdfMonitor")]
public async Task<string> RunMonitorOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext monitorContext)
{
    var _replayLog = monitorContext.CreateReplaySafeLogger(_log);
    string runId = monitorContext.GetInput<string>(); //pass the run id to the orchestration
    string status = "Queued";

    //Timeout after waiting one hour
    DateTime endTime = monitorContext.CurrentUtcDateTime.AddHours(1);
    
    while (monitorContext.CurrentUtcDateTime < endTime && (status == "InProgress" || status == "Queued"))
    {
        status = await monitorContext.CallActivityAsync<string>("CheckPipelineStatus", runId);

        _replayLog.LogInformation($"Adf Pipeline status is {status}.");

        if (status == "InProgress" || status == "Queued")
        {
            var nextCheckpoint = monitorContext.CurrentUtcDateTime.AddSeconds(5);
            await monitorContext.CreateTimer(nextCheckpoint, CancellationToken.None);
        }
    }

    return status;
}

Launch the monitor as a sub-orchestration immediately after calling the activity that starts your pipeline.

//Monitor pipeline progress
string pipelineStatus = await monitorContext.CallSubOrchestratorAsync<string>("AdfMonitor", runId);

Use the CreateTimer function to set the next time you want to check your pipeline status. In this example it executes and checks the pipeline status every 5 seconds, which is maybe a bit too frequent.

Notice also the use of the time out in the while loop, so if something goes wrong we just give up after an hour.

Here's the link to MS docs on monitors in durable functions:

https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-monitor?tabs=csharp

Use another ActivityTrigger function to check the pipeline status

[FunctionName("CheckPipelineStatus")]
public async Task<string> CheckRunStatus([ActivityTrigger] string runId)
{
    return await dataFactoryMgmtClient.GetPipelineStatus(runId);
}

Hope that helps, I use this technique a lot where the durable function loads many small batches of data into Datalake and then triggers ADF pipelines that run in parallel handling merge, copy & transform of the data.

Upvotes: 0

Utkarsh Pal
Utkarsh Pal

Reputation: 4544

The major difference of Durable Functions from Azure Functions is that they run asynchronously. When you trigger a Durable Function, it creates a background process and which gives you a few URLs that you can interact with that process; including one to query its status.

You can trigger a Durable Function through an HTTP endpoint, wait for it to finish and then get the result. Here’s how it looks like:

enter image description here

And here’s how it looks inside the Until activity:

enter image description here

Here’s the process:

  1. First, we trigger our Durable Function through an HTTP trigger using Azure Function activity.

  2. Then with the Until activity, we check status of that function.

    • The Wait activity waits around 30 seconds (or different, up to you) to let function to be executed.

    • The Web activity makes a request to the statusQueryUrl that Azure Function activity returns, by calling **@activity('StartUntar').output.statusQueryGetUri**

  3. Until activity checks the result of CheckStatus Web activity with expression **@not(or(equals(activity('CheckStatus').output.runtimeStatus, 'Pending'), equals(activity('CheckStatus').output.runtimeStatus, 'Running')))**

  4. It repeats until the function is finished or failed, or until it times out (set on the Timeout property)

Upvotes: 4

Related Questions