Reputation: 506
I have a number of Azure Data Factory pipelines that I want to execute. The pipeline hierarchy is essentially a DAG, so I'm able to execute them in order using a topological sort.
I was thinking about doing this with Durable Functions as they seem to fit the use case. However, I'm at a loss for how I go about letting the orchestrator know, that a specific pipeline has finished. I would prefer not to make modifications to the pipelines, like calling a accept/reject function at the end.
I know I can do something like the below code, but does that go against best practices regarding Durable Functions?
var pipelineRunId = await context.CallActivityAsync<string>("StartPipeline", pipelineId);
var hasFinished = false;
while(!hasFinished)
{
var fireAt = context.CurrentUtcDateTime.AddSeconds(30);
await context.CreateTimer(fireAt, CancellationToken.None);
hasFinished = await context.CallActivityAsync<bool>("CheckPipelineStatus", pipelineRunId);
}
Upvotes: 1
Views: 1832
Reputation: 940
You can both start a Data Factory pipeline and monitor for completion within a durable function.
To use the Azure credentials passed to the data factory client you will need to register your application in Azure Active Directory, generate a secret for it and then add your application registration to the Data Factory Contributor
role, which you do under your Azure Subscriptions Access Control (IAM) blade.
To start a pipeline use the IPipelinesOperations
interface. I have used the CreateRunWithHttpMessagesAsync
method with a lot of success.
Use DataFactoryManagementClient
to get the pipelines like this:
//Set up credentials for app registration
ClientCredential cc = new ClientCredential(<Your-App-ClientId>, <Your-App-Secret>);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials credentials = new TokenCredentials(result.AccessToken);
//Create data factory client and start the pipeline
var dataFactoryMgmtClient = new DataFactoryManagementClient(credentials)
{
SubscriptionId = "<your-Azure-SubscriptionId>"
};
Dictionary<string, object> parameters = new Dictionary<string, object>()
AzureOperationResponse<CreateRunResponse> runResponse = await
dataFactoryMgmtClient.Pipelines.CreateRunWithHttpMessagesAsync("<azure-resource-group>", "<factory-name>", "<pipeline-name>", parameters: parameters);
Use the parameters dictionary to pass any Data Factory parameters your data factory pipeline expects.
The AzureOperationResponse<CreateRunResponse> runResponse
contains a run id which you'll find in runResponse.Body.RunId
. You need this to monitor the execution of the pipeline.
Start the pipeline in an ActivityTrigger
function. I suggest using CallActivityWithRetryAsync
to call it with a retry.
To monitor the execution you will need to set up another orchestration. Something like this:
[FunctionName("AdfMonitor")]
public async Task<string> RunMonitorOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext monitorContext)
{
var _replayLog = monitorContext.CreateReplaySafeLogger(_log);
string runId = monitorContext.GetInput<string>(); //pass the run id to the orchestration
string status = "Queued";
//Timeout after waiting one hour
DateTime endTime = monitorContext.CurrentUtcDateTime.AddHours(1);
while (monitorContext.CurrentUtcDateTime < endTime && (status == "InProgress" || status == "Queued"))
{
status = await monitorContext.CallActivityAsync<string>("CheckPipelineStatus", runId);
_replayLog.LogInformation($"Adf Pipeline status is {status}.");
if (status == "InProgress" || status == "Queued")
{
var nextCheckpoint = monitorContext.CurrentUtcDateTime.AddSeconds(5);
await monitorContext.CreateTimer(nextCheckpoint, CancellationToken.None);
}
}
return status;
}
Launch the monitor as a sub-orchestration immediately after calling the activity that starts your pipeline.
//Monitor pipeline progress
string pipelineStatus = await monitorContext.CallSubOrchestratorAsync<string>("AdfMonitor", runId);
Use the CreateTimer
function to set the next time you want to check your pipeline status. In this example it executes and checks the pipeline status every 5 seconds, which is maybe a bit too frequent.
Notice also the use of the time out in the while loop, so if something goes wrong we just give up after an hour.
Here's the link to MS docs on monitors in durable functions:
Use another ActivityTrigger
function to check the pipeline status
[FunctionName("CheckPipelineStatus")]
public async Task<string> CheckRunStatus([ActivityTrigger] string runId)
{
return await dataFactoryMgmtClient.GetPipelineStatus(runId);
}
Hope that helps, I use this technique a lot where the durable function loads many small batches of data into Datalake and then triggers ADF pipelines that run in parallel handling merge, copy & transform of the data.
Upvotes: 0
Reputation: 4544
The major difference of Durable Functions from Azure Functions is that they run asynchronously. When you trigger a Durable Function, it creates a background process and which gives you a few URLs that you can interact with that process; including one to query its status.
You can trigger a Durable Function through an HTTP endpoint, wait for it to finish and then get the result. Here’s how it looks like:
And here’s how it looks inside the Until activity:
Here’s the process:
First, we trigger our Durable Function through an HTTP trigger using Azure Function activity.
Then with the Until activity, we check status of that function.
The Wait activity waits around 30 seconds (or different, up to you) to let function to be executed.
The Web activity makes a request to the statusQueryUrl that Azure Function activity returns, by calling **@activity('StartUntar').output.statusQueryGetUri**
Until activity checks the result of CheckStatus Web activity with expression **@not(or(equals(activity('CheckStatus').output.runtimeStatus, 'Pending'), equals(activity('CheckStatus').output.runtimeStatus, 'Running')))**
It repeats until the function is finished or failed, or until it times out (set on the Timeout property)
Upvotes: 4