Stark
Stark

Reputation: 634

Azure combiner function to receive and write the same to Azure blob storage using Python

I want to create a Azure function using Python which will read data from the Azure Event Hub. Fortunately, Visual Studio Code provides a way to create to create Azure functions skeleton. That can be edited according to the requirement. I am able to create a demo HTTP trigger Azure Function with the help of a Microsoft Documentation but I don't know what change I should made in the below function so that it can read the data from the event hub and write the same to Azure Blob Storage. Also, if someone can refer suggest any blog to get more details on azure function and standard practice.

UPDATE:

I tried to update my code based on suggestion of @Stanley but possibly it need to update in code. I have written following code in my Azure function.

local.settings.json

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "Storage account connection string",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "EventHub_ReceiverConnectionString": "Endpoint Connection String of the EventHubNamespace",
    "Blob_StorageConnectionString": "Storage account connection string"
  }
}

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "eventHubTrigger",
      "direction": "in",
      "name": "event",
      "eventHubName": "pwo-events",
      "connection": "EventHub_ReceiverConnectionString",
      "cardinality": "many",
      "consumerGroup": "$Default",
      "dataType": "binary"
    }
  ]
}

init.py

import logging
import azure.functions as func
from azure.storage.blob import BlobClient

storage_connection_string='Storage account connection string'
container_name = ''


def main(event: func.EventHubEvent):
    logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
    logging.info(f'  SequenceNumber = {event.sequence_number}')
    logging.info(f'  Offset = {event.offset}')
    blob_client = BlobClient.from_connection_string(storage_connection_string,container_name,str(event.sequence_number) + ".txt")
    blob_client.upload_blob(event.get_body().decode())

Following is the screenshot of my blob container: enter image description here

After executing he above code something got written to blob containers. but instead of txt file it got saved in some other format. also, if I trigger azure function multiple time then files are getting overwritten. I want to perform append operation instead of overwrite. Also, I want to save my file in user defined location. Example: container/Year=/month=/date= Thanks !!

Upvotes: 1

Views: 453

Answers (1)

Stanley Gong
Stanley Gong

Reputation: 12153

If you want to read data from the Azure Event Hub, using the event hub trigger will be much easier, this is my test code (read data and write into storage):

import logging
import azure.functions as func
from azure.storage.blob import BlobClient
import datetime

storage_connection_string=''
container_name = ''

today = datetime.datetime.today()


def main(event: func.EventHubEvent):
    logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
    logging.info(f'  SequenceNumber = {event.sequence_number}')
    logging.info(f'  Offset = {event.offset}')

    blob_client =  BlobClient.from_connection_string(
        storage_connection_string,container_name,
    str(today.year) +"/" + str(today.month) + "/" + str(today.day) + ".txt")

blob_client.upload_blob(event.get_body().decode(),blob_type="AppendBlob")

I use the code below to send events to the event hub:

import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
    # the event hub name.
    producer = EventHubProducerClient.from_connection_string(conn_str="<conn string>", eventhub_name="<hub name>")
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        event_data_batch.add(EventData('First event '))
        event_data_batch.add(EventData('Second event'))
        event_data_batch.add(EventData('Third event'))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

My local.settings.json:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "<storage account conn str>",
        "FUNCTIONS_WORKER_RUNTIME": "python",
        "testhubname0123_test_EVENTHUB": "<event hub conn str>"
    }
}

My function.json just as this doc indicated:

{
    "scriptFile": "__init__.py",
    "bindings": [{
        "type": "eventHubTrigger",
        "name": "event",
        "direction": "in",
        "eventHubName": "test01(this is my hubname, pls palce yours here)",
        "connection": "testhubname0123_test_EVENTHUB"
    }]
}

Result

Run the function and send data to the event hub using the code above: enter image description here

Data has been saved into storage successfully:

enter image description here

Download .txt and check its content we can see that 3 event content has been written: enter image description here

Upvotes: 1

Related Questions