Alber Tadrous
Alber Tadrous

Reputation: 69

Sending an event on creating a new file in azure data lake gen 1

I want to send an event or a notification to external NIFI flow once a new file has been added to azure data lake gen 1.

Any one worked or has any information about this use case?

Upvotes: 5

Views: 2158

Answers (4)

Raunak Jhawar
Raunak Jhawar

Reputation: 1651

This can also be solved using the following approach:

  • Create a new function app and associate a blob trigger function (associated with the event - Microsoft.Storage.BlobCreated)
  • The event handler (i.e. the function app function body) will trigger your code to write a topic to EventHub
  • Create a new flow in NiFi and use the GetAzureEventHub to receive messages from the target namespace

Upvotes: 0

Adam Marczak
Adam Marczak

Reputation: 2351

Hey this is a very good question and unfortunately currently there is no out of the box event that you can hook up but I found a way to do it using diagnostic logs.

So the idea is simple

  1. Set up diagnostic logs on ADLSv1 to stream logs to event hub
  2. Trigger logic app for new entires on event hub

enter image description here


So to set up diagnostic logs on ADLSv1

  1. Go to diagnostic settings on ADLSv1

enter image description here

  1. Add diagnostic settings

enter image description here

  1. Set up link to your event hub

enter image description here


Now set up logic app

  1. Open logic app designer

enter image description here

  1. Add event hub trigger (this assumes hub named stackdemohub exists)

enter image description here

  1. Add filter array action

enter image description here

3.1. Set From to expression

triggerBody()?['ContentData']['records']

3.2. Set right condition to 'create' and left condition field to expression

item()['operationName']
  1. Add for-each loop and pass body from filter array step

enter image description here

  1. Save and run it

5.1. You will see that when logic app is executed successfully

enter image description here

you will find new files on the list

enter image description here

As you can see one of the files I uploaded was called MarketplaceCharges.json inside of demo folder.

Each event looks like this

{
    "time": "2019-09-18T07:48:20.342Z",
    "resourceId": "/SUBSCRIPTIONS/2BCB9F3D-3F6B-4345-A49E-86D3141C7F73/RESOURCEGROUPS/STACKDEMO/PROVIDERS/MICROSOFT.DATALAKESTORE/ACCOUNTS/STACKDEMO",
    "category": "Requests",
    "operationName": "create",
    "resultType": "201",
    "callerIpAddress": "::ffff:111.222.333.444",
    "correlationId": "93faafd5-dfa2-4432-91f8-c7f360d80655",
    "identity": "[email protected]",
    "properties": {
      "HttpMethod": "PUT",
      "Path": "/webhdfs/v1/demo/MarketplaceCharges.json",
      "RequestContentLength": 0,
      "ClientRequestId": "288c654f-0948-4468-8e92-b158cc265c54",
      "StartTime": "2019-09-18T07:48:20.264Z",
      "EndTime": "2019-09-18T07:48:20.334Z",
      "UserId": "8162E212-E32B-443C-8F13-1CDA7B264DDB"
    }
}

and you get value with file path /webhdfs/v1/demo/MarketplaceCharges.json

I created 3 files on ADLSv1 and got 3 items in the loop as expected

enter image description here

Now you can do whatever you want with this event information and send it whever you need.


On closing note you might want to change Logic Apps to Function Apps in case of larger volume of requests as logic apps are not cheap on larger scale

[FunctionName("EventHubTriggerCSharp")]
public static void Run([EventHubTrigger("stackdemo", Connection = "EventHubConnectionAppSetting")] EventData[] eventHubMessages, ILogger log)
{
    foreach (var message in eventHubMessages)
    {
        // parse items and do something
    }
}

Also one important note for larger files make sure to add some delay to the process as request type created is when file appears on storage even if it still is being copied. In which case further 'append' events are comming through.

Upvotes: 6

Alber Tadrous
Alber Tadrous

Reputation: 69

I found a solution to this problem by using Azure Data Lake Gen 2 we can trigger different kind of events like "Azure Function", and make this function send notification with the path to the file that just added to NIFI flow, which starts with one of the listeners processors like "ListenHttp or HandleHttpRequest" processors. After that we can use any processor fetch this file from your storage.

Upvotes: -1

DixitArora-MSFT
DixitArora-MSFT

Reputation: 1811

Though Azure Data Lake Storage (ADLS) Gen2 is built upon Azure Blob Storage, there are a couple of known issues and differences which are documented.

Because of these differences, I believe we can't use the existing bindings available for Blob storage or Event Grid.

But you could still have a Function, triggered by Timer, for example and use the ADLS v2 REST API to read/update files.

Upvotes: -1

Related Questions