db533
db533

Reputation: 95

Managing event hubs in Azure from python

I'm trying to write python code to be able to create / delete event hubs on Azure Event Hub from python scripts. I have managed to create an EventHubManagementClient following the documentation on this page. I believe I now need to use the EventHubsOperations Class as documented here.

I have 2 challenges:

  1. what would be 'aaaa' in the "from aaaa import EventHubsOperations" line so as to be able to refer to the class? I can't seem to find how to call the respective package to import the class...
  2. What values to pass for config, serializer and deserializer that all are required values when using the class? Perhaps someone can share an example of how to use this class?

Ideally I'm looking to call the create_or_delete method to create a new event hub or to delete an existing event hub from the python script. If someone can share how this code should be extended to achieve this, I'd appreciate it. The documentation seems increadibly light: "config, Required, Configuration of service client"...

My code is as follows:

import setenv
import os
from azure.mgmt.eventhub import EventHubManagementClient
from azure.identity import DefaultAzureCredential

setenv.import_env_vars('')

vault_url = os.environ["KEY_VAULT_URL"]
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"]

credential = DefaultAzureCredential()

print('Creating EH_client...')
EH_client = EventHubManagementClient(vault_url, credential, subscription_id, base_url=None)
print('Created.')

EventHubsOperations(EH_client)

The resulting output is as follows:

Project root: 
filename: env_values
Creating EH_client...
Created.
Traceback (most recent call last):
  File "/home/db533/gitRepos/GunaBot2/azure-mgmt/azure_test.py", line 25, in <module>
    EventHubsOperations(EH_client)
NameError: name 'EventHubsOperations' is not defined

Process finished with exit code 1

Upvotes: 0

Views: 866

Answers (2)

db533
db533

Reputation: 95

Here's my code that worked for creating and deleting eventhubs from python.

I use a separate script (setenv.py) to load environment variables that are stored in a text file.

import os
import setenv
from azure.mgmt.eventhub import EventHubManagementClient
from azure.mgmt.resource import ResourceManagementClient
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.storage import StorageManagementClient
from azure.mgmt.storage.models import (StorageAccountCreateParameters,Sku,SkuName,Kind)

set_env_path="C:\\Users\\db533\\PycharmProjects\\GunaBot2\\shared_files\\"
setenv.import_env_vars(set_env_path,'env_values')

def main():
    SUBSCRIPTION_ID = os.environ.get("AZURE_SUBSCRIPTION_ID", None)
    GROUP_NAME = "annabot-eventhub2"
    STORAGE_ACCOUNT_NAME = "storageaccountxyztest"
    NAMESPACE_NAME = "annabot-eventhub999"
    EVENTHUB_NAME = "worker99901"

    tenant_id = os.environ["AZURE_TENANT_ID"]
    client_id = os.environ["AZURE_CLIENT_ID"]
    client_secret = os.environ["AZURE_CLIENT_SECRET"]
    print('AZURE_CLIENT_SECRET:',client_secret)

    credential_common = ServicePrincipalCredentials(tenant=tenant_id, client_id=client_id, secret=client_secret)

    # Create client
    print(" Create resource client...")
    resource_client = ResourceManagementClient(credential_common, SUBSCRIPTION_ID)

    print(" Create Event hub client...")
    eventhub_client = EventHubManagementClient(credential_common,SUBSCRIPTION_ID)

    print(" Create storage client...")
    storage_client = StorageManagementClient(credential_common,SUBSCRIPTION_ID)

    # Create resource group
    print(" Create resource group...")
    resource_client.resource_groups.create_or_update(
        GROUP_NAME,
        {"location": "germanywestcentral"}
    )

    # Create StorageAccount
    print(" Create storageAccount...")
    storage_async_operation = storage_client.storage_accounts.create(
        GROUP_NAME,
        STORAGE_ACCOUNT_NAME,
        StorageAccountCreateParameters(
            sku=Sku(name=SkuName.standard_lrs),
            kind=Kind.storage_v2,
            location='germanywestcentral'
        )
    )
    storage_account = storage_async_operation.result()

    # Create Namespace
    print(" Create event hub namespace...")
    eventhub_client.namespaces.create_or_update(
        GROUP_NAME,
        NAMESPACE_NAME,
        {
          "sku": {
            "name": "Standard",
            "tier": "Standard"
          },
          "location": "Germany West Central",
          "tags": {
            "tag1": "value1",
            "tag2": "value2"
          },
          "kafka_enabled": "True"
        }
    ).result()

    # Create EventHub
    print(" Create event hub...")
    eventhub = eventhub_client.event_hubs.create_or_update(
        GROUP_NAME,
        NAMESPACE_NAME,
        EVENTHUB_NAME,
        {
          "message_retention_in_days": "4",
          "partition_count": "4",
          "status": "Active",
          "capture_description": {
            "enabled": True,
            "encoding": "Avro",
            "interval_in_seconds": "120",
            "size_limit_in_bytes": "10485763",
            "destination": {
              "name": "EventHubArchive.AzureBlockBlob",
              "storage_account_resource_id": "/subscriptions/" + SUBSCRIPTION_ID + "/resourceGroups/" + GROUP_NAME + "/providers/Microsoft.Storage/storageAccounts/" + STORAGE_ACCOUNT_NAME + "",
              "blob_container": "container",
              "archive_name_format": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
            }
          }
        }
    )
    print("Created EventHub: {}".format(eventhub))

    # Get EventHub
    eventhub = eventhub_client.event_hubs.get(
        GROUP_NAME,
        NAMESPACE_NAME,
        EVENTHUB_NAME
    )
    print("get() for EventHub: {}\n".format(eventhub))

    #Create authorisation rule
    eventhub_rule = eventhub_client.event_hubs.create_or_update_authorization_rule(
        GROUP_NAME,
        NAMESPACE_NAME,
        event_hub_name=EVENTHUB_NAME,
        authorization_rule_name="manager",
        rights=["LISTEN","SEND"]
    )
    print("create_or_update_authorization_rule() for Manager for EventHub: {}\n".format(eventhub_rule))

    # Get authorisation rule
    eventhub_rule2 = eventhub_client.event_hubs.get_authorization_rule(
        GROUP_NAME,
        NAMESPACE_NAME,
        event_hub_name=EVENTHUB_NAME,
        authorization_rule_name="manager"
    )
    print("get_authorization_rule() for manager for EventHub: {}\n".format(eventhub_rule2))

    # List keys
    namespace_keys = eventhub_client.event_hubs.list_keys(
        GROUP_NAME,
        NAMESPACE_NAME,
        event_hub_name=EVENTHUB_NAME,
        authorization_rule_name="manager"
    )
    print("list_keys() for EventHub: {}\n".format(namespace_keys))
    print("namespace_keys.primary_connection_string:",namespace_keys.primary_connection_string)

# Delete EventHub
    eventhub_client.event_hubs.delete(
        GROUP_NAME,
        NAMESPACE_NAME,
        EVENTHUB_NAME
    )
    print("Delete EventHub.")

    # Delete Namespace
    eventhub_client.namespaces.delete(
        GROUP_NAME,
        NAMESPACE_NAME
    ).result()

    # Delete StorageAccount
    storage_client.storage_accounts.delete(
        GROUP_NAME,
        STORAGE_ACCOUNT_NAME
    )

    # Delete resource group
    resource_client.resource_groups.delete(
        GROUP_NAME
    ).result()


if __name__ == "__main__":
    main()

The setenv.py script to load the environment variables is as follows. (I got this from another answer. Can't take credit for this...):

import os

def import_env_vars(env_folder,env_filename):
    """Imports some environment variables from a special .env file in the
    project root directory.
    """
    print("env_folder:",env_folder)
    if len(env_folder) > 0 and env_folder[-1] != '\\':
        env_folder += '\\'
    try:
        print("filename:",env_folder+env_filename)
        envfile = open(env_folder+env_filename, "r")

    except IOError:
        raise Exception("You must have a {0} file in your project root "
                        "in order to run the server in your local machine. "
                        "This specifies some necessary environment variables. ")
    for line in envfile.readlines():
        [key,value] = line.strip().split("=")
        os.environ[key] = value
        print("key:",key)
        print("value:", value)

Environment variables are defined in the file as follows:

EVENTHUB_SERVER=gunabot-eventhub.servicebus.windows.net
DEV_STAGE=Dev
AZURE_SUBSCRIPTION_ID=xxxxxxxxx-xxxx-xxxxxxx-xxxxx-xxxx
AZURE_TENANT_ID=yyyyyyyyy-yyyyy-yyyyyy-yyyyyy
AZURE_CLIENT_ID=zzzzzz-zzzzzz-zzzzzz-zzzzzzz-zzz
AZURE_CLIENT_SECRET=qqqqq-qqqq-qqqqqqq-qqqqq-qqqqq

Hope this helps others.

Upvotes: 1

Utkarsh Pal
Utkarsh Pal

Reputation: 4544

Import the EventHubsOperation class using below statement:

from azure.mgmt.eventhub.v2021_01_01_preview.operations import EventHubsOperations

config is the configuration details of the service client.

serializer and deserializer objects are the objects which helps to dump and load the objects into byte stream, e.g. pickle module. Link to know more.

Once you are ready with these parameters, you need to create object for EventHubsOperations class passing values for all the parameters.

object_name = EventHubsOperations(client=(value), config=(value), serializer=(value), deserializer=(value))

Using this object you can use create_or_update and delete methods of this class with required parameters.

object_name.create_or_update(resource_group_name, namespace_name, event_hub_name, parameters, **kwargs)

object_name.delete(resource_group_name, namespace_name, event_hub_name, **kwargs)

You can also discover Source code for azure.mgmt.eventhub.v2018_01_01_preview.operations.event).

Upvotes: 0

Related Questions