Reputation: 1
I have the requirement to execute an Azure Function (let's call it OperationalFunction) only once in parallel. What I mean is the following:
I have two "entry point" functions:
I have a BlobTrigger Function (BlobTriggeredStartFunction) I have a HttpTrigger Function (HttpTriggeredStartFunction) Both of those functions call an orchestrator function (OrchestratorFunction) like this:
string instanceId = await starter.StartNewAsync(nameof(OrchestrationFunction),null,data);
This orchestration function has the following code:
[FunctionName("OrchestrationFunction")]
public async Task OrchestrationFunction(
[OrchestrationTrigger] IDurableOrchestrationContext context){
var input = context.GetInput<string>();
await context.CallActivityAsync(nameof(OperationalFunction),input);
}
I want that the *OperationalFunction *is executed ONLY ONCE in parallel, so that if I receive multiple triggers (n blob/http-triggered events), the *OperationalFunction *at a certain time, is executing only one instance, when such instance finishes, then the new event in the queue (FIFO) is executed.
Which is the best approach to that?
I tried to edit the host.json file by adding:
"blobs": {
"maxDegreeOfParallelism": 1,
"poisonBlobThreshold": 1
}
But it is not exactly what I am looking for, just a part of the problem is solved.
I also tried to add a loop do...while for getting the status of the orchestrator:
do {
status = await starter.GetStatusAsync(instanceId);
await Task.Delay(2000);
}
while (
status.RuntimeStatus == OrchestrationRuntimeStatus.Running ||
status.RuntimeStatus == OrchestrationRuntimeStatus.Pending);
But I don't think this is the most efficient solution.
Upvotes: 0
Views: 76
Reputation: 3649
I created a sample Durable Orchestration with two triggers (Blob trigger and HTTP trigger) and successfully executed the OperationalFunction
only once in parallel.
host.json
, we can ensure that OperationalFunction
runs only one instance at a time across all triggers. "durableTask": {
"maxConcurrentActivityFunctions": 1,
"maxConcurrentOrchestrations": 1
}
Below is the complete code for the durable orchestration using both Blob trigger and HTTP trigger functions.
HttpTriggeredStartFunction.cs :
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.Http;
namespace FunctionApp5.Triggers
{
public class HttpTriggeredStartFunction
{
[FunctionName("HttpTriggeredStartFunction")]
public async Task<IActionResult> RunAsync(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
log.LogInformation("HTTP Triggered start function hit.");
string requestData = await new StreamReader(req.Body).ReadToEndAsync();
await starter.StartNewAsync("OrchestrationFunction", null, requestData);
return new OkObjectResult("HTTP orchestration started successfully.");
}
}
}
BlobTriggeredStartFunction.cs :
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Threading.Tasks;
namespace FunctionApp5.Triggers
{
public class BlobTriggeredStartFunction
{
[FunctionName("BlobTriggeredStartFunction")]
public async Task RunAsync(
[BlobTrigger("kamcontainer/{name}", Connection = "AzureWebJobsStorage")] Stream blobStream,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
log.LogInformation($"Blob trigger received for: {blobStream}");
await starter.StartNewAsync("OrchestrationFunction", null, "blob-data");
}
}
}
OperationalFunction.cs :
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
namespace FunctionApp5.Activities
{
public class OperationalFunction
{
[FunctionName("OperationalFunction")]
public async Task<string> RunAsync([ActivityTrigger] string input, ILogger log)
{
log.LogInformation($"OperationalFunction execution started with input: {input}");
await Task.Delay(5000);
log.LogInformation("OperationalFunction execution finished.");
return "Success";
}
}
}
OrchestrationFunction.cs :
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
namespace FunctionApp5.Orchestrator
{
public class OrchestrationFunction
{
[FunctionName("OrchestrationFunction")]
public async Task OrchestrationFunctionAsync(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger log)
{
log.LogInformation("Starting orchestration.");
var input = context.GetInput<string>();
await context.CallActivityAsync<string>("OperationalFunction", input);
log.LogInformation("Operational Function finished execution.");
}
}
}
Output :
The Durable Orchestration function executed successfully, and the logs from the Blob trigger function were displayed as shown below.
I successfully sent a POST
request to the HTTP trigger function using Postman, as shown below.
I got below function logs after post request.
Upvotes: 0