Reputation: 313
Function 1 sends a message body by HTTP post every 5 minutes. In Data Factory I want to receive this message and pass this body to Function 2 as input.
{ "body": "run" }
How can I receive this message in the Azure Data factory?
Upvotes: 2
Views: 3664
Reputation: 1145
Okay first off, you can definitely trigger a pipeline execution in ADF via a REST API call and even pass parameters (such as body contents) to that pipeline.
Refer to this link: https://learn.microsoft.com/en-us/rest/api/datafactory/pipelines/create-run
I’m not at my computer right now, but I will give an example tomorrow. That being said, there are many ways to do what you want, within ADF and without ADF.
Here is an example of calling an Azure Data Factory Pipeline from an HTTP Triggered Azure Function. I apologize I am not fluent in C# so the code might look bad, but I have tested this and it works.
Here is the code I am using for the Azure function:
#r "Newtonsoft.Json"
using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
public static async Task<IActionResult> Run(HttpRequest req, ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string name = req.Query["PipelineName"];
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
dynamic data = JsonConvert.DeserializeObject(requestBody);
name = name ?? data?.PipelineName;
using(var client = new HttpClient())
{
//I Gave my Azure Function a system assigned Managed Identity, that way you can give RBAC roles for access to Management API and Azure Data Factory
//The below code gets the access token to be used in authenticating for the next API Call to ADF
client.DefaultRequestHeaders.Add("Secret", Environment.GetEnvironmentVariable("MSI_SECRET"));
var response = await client.GetAsync(String.Format("{0}/?resource={1}&api-version={2}", Environment.GetEnvironmentVariable("MSI_ENDPOINT"), "https://management.azure.com/", "2017-09-01"));
string msiResponse = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
dynamic msiResponseData = JsonConvert.DeserializeObject(msiResponse);
//Access Token is saved below
string accessToken = msiResponseData?.access_token;
//return new OkObjectResult(accessToken);
using(var client1 = new HttpClient())
{
string uri = "https://management.azure.com/subscriptions/xxxxxxxxxxxxxxxxxx/resourceGroups/xxxxxxxxx/providers/Microsoft.DataFactory/factories/xxxxxxxxxxxxx/pipelines/ADFTestPipeline/createRun?api-version=2018-06-01";
//string uri = "https://management.azure.com/subscriptions/[SubscriptionID]/resourceGroups/[ResourceGroupName]/providers/Microsoft.DataFactory/factories/[DataFactoryName]/pipelines/[PipelineName]/createRun?api-version=2018-06-01";
string content = "{}";
client1.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", accessToken);
//The below Posts to the ADF API using the authorization header and bearer token, in order to pass parameters to the ADF Pipeline, you can modify the content string variable above (json format)
var response1 = await client1.PostAsync(uri, new StringContent(content.ToString(), Encoding.UTF8, "application/json"));
var result = await response1.Content.ReadAsStringAsync();
return new OkObjectResult(result);
//return new OkObjectResult(result);
}
}
}
In Order for this to work you need to make sure your Azure Function has a system assigned managed Identity, and then has access to your ADF to run a pipeline, and access to call the Azure Management API, these can both be assigned via RBAC
SETTING SYSTEM ASSIGNED MANAGED IDENTITY ENABLED:
ROLES YOUR AZ FUNCTION NEEDS:
Proof that it works:
Azure Function Output/Response:
ADF Monitor Tab:
I hope this helps and gives you an idea of how to make this work. Please accept as answer if it meets your needs and let me know if you need any more detail!
Upvotes: 3
Reputation: 29720
Azure Data Factory (ADF) supports a limited set of triggers. An http trigger is not one of them.
I would suggest to have Function1 call Function2 directly. Then have Function2 store the data in a blob file.
After that you can use the Storage event trigger of ADF to run the pipeline:
Storage event trigger runs a pipeline against events happening in a Storage account, such as the arrival of a file, or the deletion of a file in Azure Blob Storage account.
From there you can use the ADF pipeline to pick up the data and process it further in the pipeline.
Upvotes: 0