K V
K V

Reputation: 1

Chatbot does not deploy correctly to MS Azure

I've built a bot in Python, but it doesn't deploy correctly as a Web App in MS Azure. Here is the .py code I'm using:

import json
import uuid
import asyncio
import aiohttp
import re  # For parsing commands more effectively
import shlex  # To properly handle splitting with quotes
from dotenv import load_dotenv
from aiohttp import web
from botbuilder.core import BotFrameworkAdapter, BotFrameworkAdapterSettings, TurnContext
from botbuilder.schema import Activity

# Load environment variables from .env file
load_dotenv()

# Accessing variables from environment
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
INGRAM_CLIENT_ID = os.getenv("INGRAM_CLIENT_ID")
INGRAM_CLIENT_SECRET = os.getenv("INGRAM_CLIENT_SECRET")
INGRAM_CUSTOMER_NUMBER = os.getenv("INGRAM_CUSTOMER_NUMBER")

async def get_access_token():
    """ Asynchronously obtain an access token from Ingram Micro's OAuth endpoint. """
    url = "https://api.ingrammicro.com:443/oauth/oauth30/token"  
    payload = {
        'grant_type': 'client_credentials',
        'client_id': INGRAM_CLIENT_ID,
        'client_secret': INGRAM_CLIENT_SECRET
    }
    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    
    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, data=payload) as response:
            if response.status == 200:
                data = await response.json()
                # Store the time at which the token will expire
                expire_time = asyncio.get_running_loop().time() + int(data['expires_in']) - 300  # Adding buffer to renew before actual expiry
                return data['access_token'], expire_time
            else:
                print(f"Failed to obtain access token: {response.status}, {await response.text()}")
                return None, None

def parse_search_query(query):
    # Example: search product details for "hard drive", SSD, and "external storage"
    keyword_pattern = re.compile(r'search product details for (.+)', re.IGNORECASE)
    match = keyword_pattern.search(query)
    if match:
        search_text = match.group(1)
        # Using shlex.split to handle quoted words as single terms
        keywords = shlex.split(search_text)
        return keywords
    return []

def construct_params(keyword, filters=None):
    params = {
        'pageNumber': 1,
        'pageSize': 25,
        'type': 'IM::any',
        'keyword': keyword,
        'includeProductAttributes': 'true',
        'includePricing': 'true',
        'includeAvailability': 'true'
    }
    if filters:
        params.update(filters)
    return params

def format_response(products):
    formatted_products = []
    for product_data in products:
        for product in product_data.get('catalog', []):
            links_info = "No direct link available"
            if 'links' in product and product['links']:
                link = next((link for link in product['links'] if link.get('type') == 'GET'), None)
                links_info = link['href'] if link else links_info
            description = product.get('description', 'No description available')
            category = product.get('category', 'No category')
            vendor_name = product.get('vendorName', 'No vendor name')
            vendorPartNumber= product.get('vendorPartNumber', 'No vendor Part number')
            extraDescription = product.get('extraDescription', 'No Extended Description available')
            subCategory = product.get('subCategory', 'No subcategory')
            productType = product.get('productType', 'No product type')
            formatted_product = f"{vendor_name} - {description} - {category} - {subCategory} - {productType}\nPrice and availability: {links_info}"
            formatted_products.append(formatted_product)
    return "\n\n".join(formatted_products)  # Join all formatted product details with a newline

async def fetch_products(access_token, keywords):
    """Fetch and format products based on multiple keywords."""
    results = []
    url = 'https://api.ingrammicro.com:443/sandbox/resellers/v6/catalog'
    headers = {
        'Authorization': f'Bearer {access_token}',
        'IM-CustomerNumber': INGRAM_CUSTOMER_NUMBER,
        'IM-SenderID': 'MyCompany',
        'IM-CorrelationID': str(uuid.uuid4()),
        'IM-CountryCode': 'US',
        'Accept-Language': 'en',
        'Content-Type': 'application/json',
    }

    async with aiohttp.ClientSession() as session:
        for keyword in keywords:
            params = {
                'pageNumber': 1,
                'pageSize': 25,
                'type': 'IM::any',
                'keyword': keyword.strip(),  # Remove any extra whitespace around keywords
                'includeProductAttributes': 'true',
                'includePricing': 'true',
                'includeAvailability': 'true'
            }
            async with session.get(url, headers=headers, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    results.append(data)  # Append the response data to the results list
                else:
                    print(f"Failed API Call for keyword '{keyword}': {response.status}, {await response.text()}")
    return results  # Return the list of results for all keywords

async def get_intent_from_openai(query):
    prompt = f"What is the user looking for if they say: '{query}'? Is it a printer or printer supplies?"
    response = await ask_openai(prompt)
    return 'printer' if 'printer' in response.lower() and 'supply' not in response.lower() else 'cartridge'

async def ask_openai(prompt):
    """ Use OpenAI to process general inquiries based on user input using the chat model. """
    headers = {"Authorization": f"Bearer {OPENAI_API_KEY}"}
    payload = {
        "model": "gpt-4-turbo",
        "messages": [{"role": "user", "content": prompt}]
    }
    url = "https://api.openai.com/v1/chat/completions"
    
    async with aiohttp.ClientSession(headers=headers) as session:
        async with session.post(url, headers=headers, json=payload) as response:
            if response.status == 200:
                data = await response.json()
                return data['choices'][0]['message']['content'].strip()
            else:
                print("Failed to process request with OpenAI:", response.status, await response.text())
                return "I had an error processing your request. Please try again later."

def format_product_details(product_details):
    formatted_products = []
    for product in product_details:
        ingram_part_number = product.get('ingramPartNumber', 'N/A').upper()  # Force uppercase for consistency
        description = product.get('description', 'No description available')  # Default description
        product_status_code = product.get('productStatusCode', 'N/A')
        product_status_message = product.get('productStatusMessage', 'No status message available')

        # Extracting availability details
        availability = product.get('availability', {})
        available = availability.get('available', False)
        total_availability = availability.get('totalAvailability', 0)

        # Extracting pricing details
        pricing = product.get('pricing', {})
        retail_price = pricing.get('retailPrice', 'N/A')
        customer_price = pricing.get('customerPrice', 'N/A')

        # Constructing the formatted product string using " | " as a delimiter
        formatted_product = (
            f"Product Number: {ingram_part_number} \n\n "
            f"Product Status Code: {product_status_code} - \n\n {product_status_message} \n\n "
            f"Description: {description} \n\n "
            f"Availability: {'Available' if available else 'Not Available'} \n\n "
            f"Total Availability: {total_availability} \n\n "
            f"Retail Price: {retail_price} \n\n "
            f"Customer Price: {customer_price}"
        )
        formatted_products.append(formatted_product)
    
    return "\n\n".join(formatted_products)  # Use "\n\n" to separate different products clearly

class EchoBot:
    def __init__(self):
        self.access_token, self.token_expire_time = asyncio.run(get_access_token())

    async def ensure_access_token(self):
        """ Ensure a valid access token is always available. """
        if not self.access_token or asyncio.get_running_loop().time() > self.token_expire_time:
            self.access_token, self.token_expire_time = await get_access_token()
            if not self.access_token:
                raise Exception("Unable to retrieve a valid token")

    async def fetch_price_and_availability(self, ingram_part_number):
        url = (f'https://api.ingrammicro.com:443/sandbox/resellers/v6/catalog/priceandavailability'
           f'?includePricing=true&includeAvailability=true&includeProductAttributes=true')

        headers = {
            'Authorization': f'Bearer {self.access_token}',
            'Content-Type': 'application/json',
            'IM-CustomerNumber': INGRAM_CUSTOMER_NUMBER,
            'IM-CountryCode': 'US',
            'IM-CorrelationID': str(uuid.uuid4()),
            'IM-SenderID': 'MyCompany',
            'Accept': 'application/json'
        }

        data = json.dumps({"products": [{"ingramPartNumber": ingram_part_number.upper()}]})  # Force uppercase for IngramPartNumber

        async with aiohttp.ClientSession() as session:
            async with session.post(url, headers=headers, data=data) as response:
                if response.status == 200:
                    product_details = await response.json()
                    print("DEBUG: Full API Response:", product_details)  
                    return format_product_details(product_details)
                else:
                    error_message = await response.text()
                    print(f"Failed to fetch details: {response.status} - {error_message}")
                    return f"Failed to fetch details: {response.status} - {error_message}"
    async def on_turn(self, turn_context: TurnContext):
        """Handle incoming messages."""
        await self.ensure_access_token()  
        if turn_context.activity.type == 'message':
            user_message = turn_context.activity.text.lower()
            keyword_search = re.search(r"search product details for (.+)", user_message)
            product_id_search = re.search(r"price and availability for (\w+)", user_message)

            if keyword_search:
                keywords = keyword_search.group(1).split(',')  # Assuming keywords are comma-separated
                products_data = await fetch_products(self.access_token, keywords)
                response = format_response(products_data)
                await turn_context.send_activity(Activity(type="message", text=response))
            elif product_id_search:
                product_id = product_id_search.group(1)
                response = await self.fetch_price_and_availability(product_id)
                await turn_context.send_activity(Activity(type="message", text=response))
            else:
                response = await ask_openai(user_message)
                await turn_context.send_activity(Activity(type="message", text=response))

        elif turn_context.activity.type == 'conversationUpdate':
            if turn_context.activity.members_added:
                for member in turn_context.activity.members_added:
                    if member.id != turn_context.activity.recipient.id:
                        await turn_context.send_activity(Activity(type="message", text="Welcome to the Ingram Micro Bot! Type 'hello' to start or ask me anything."))

if __name__ == "__main__":
    settings = BotFrameworkAdapterSettings(app_id="", app_password="")
    adapter = BotFrameworkAdapter(settings)
    bot = EchoBot()
    app = web.Application()

    async def messages(req: web.Request) -> web.Response:
        if "application/json" in req.headers.get("Content-Type", ""):
            body = await req.json()
            activity = Activity.deserialize(body)
            auth_header = req.headers.get("Authorization", "")
            await adapter.process_activity(activity, auth_header, bot.on_turn)
            return web.Response(status=200)
        return web.Response(status=415)

    app.router.add_post("/api/messages", messages)
    web.run_app(app, host='localhost', port=3978)

I deployed the code via VSC Azure extension, but when I try to open the bot I get an Application error message.

Here is where the bot should be hosted: https://apollo001webapp.azurewebsites.net/

Thanks in advance.

Upvotes: 0

Views: 77

Answers (1)

Pravallika KV
Pravallika KV

Reputation: 8694

Add a init_func() and configure the host and port in the code in your app.py.

Sample Code Snippet:

def init_func(argv):
    APP = web.Application(middlewares=[aiohttp_error_middleware])
    APP.router.add_post("/api/messages", messages)
    return APP
if __name__ == "__main__":
    APP = init_func(None)

    try:
        web.run_app(APP, host="0.0.0.0", port=CONFIG.PORT)
    except Exception as error:
        raise error

Add App_ID and App_password in config.py.

class DefaultConfig:
    """ Bot Configuration """

    PORT = 3978
    APP_ID = os.environ.get("MicrosoftAppId", "49b71438-565b-440b-b549-7e53b9b0dbd4")
    APP_PASSWORD = os.environ.get("MicrosoftAppPassword", "2r58Q~K7HT9D7uYZDg0rvQcoJKh4A9EqpHu_tdr7")

I have created a python bot and deployed to Azure.

App.py:

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import sys
import traceback
from datetime import datetime

from aiohttp import web
from aiohttp.web import Request, Response, json_response
from botbuilder.core import (BotFrameworkAdapter, BotFrameworkAdapterSettings,
                             TurnContext)
from botbuilder.core.integration import aiohttp_error_middleware
from botbuilder.schema import Activity, ActivityTypes

from bot import MyBot
from config import DefaultConfig

CONFIG = DefaultConfig()

# Create adapter.
# See https://aka.ms/about-bot-adapter to learn more about how bots work.

SETTINGS = BotFrameworkAdapterSettings(CONFIG.APP_ID, CONFIG.APP_PASSWORD)
ADAPTER = BotFrameworkAdapter(SETTINGS)


# Catch-all for errors.
async def on_error(context: TurnContext, error: Exception):
    # This check writes out errors to console log .vs. app insights.
    # NOTE: In production environment, you should consider logging this to Azure
    #       application insights.
    print(f"\n [on_turn_error] unhandled error: {error}", file=sys.stderr)
    traceback.print_exc()

    # Send a message to the user
    await context.send_activity("The bot encountered an error or bug.")
    await context.send_activity(
        "To continue to run this bot, please fix the bot source code."
    )
    # Send a trace activity if we're talking to the Bot Framework Emulator
    if context.activity.channel_id == "emulator":
        # Create a trace activity that contains the error object
        trace_activity = Activity(
            label="TurnError",
            name="on_turn_error Trace",
            timestamp=datetime.utcnow(),
            type=ActivityTypes.trace,
            value=f"{error}",
            value_type="https://www.botframework.com/schemas/error",
        )
        # Send a trace activity, which will be displayed in Bot Framework Emulator
        await context.send_activity(trace_activity)


ADAPTER.on_turn_error = on_error

# Create the Bot
BOT = MyBot()


# Listen for incoming requests on /api/messages
async def messages(req: Request) -> Response:
    # Main bot message handler.
    if "application/json" in req.headers["Content-Type"]:
        body = await req.json()
    else:
        return Response(status=415)

    activity = Activity().deserialize(body)
    auth_header = req.headers["Authorization"] if "Authorization" in req.headers else ""

    response = await ADAPTER.process_activity(activity, auth_header, BOT.on_turn)
    if response:
        return json_response(data=response.body, status=response.status)
    return Response(status=201)


def init_func(argv):
    APP = web.Application(middlewares=[aiohttp_error_middleware])
    APP.router.add_post("/api/messages", messages)
    return APP
if __name__ == "__main__":
    APP = init_func(None)

    try:
        web.run_app(APP, host="0.0.0.0", port=CONFIG.PORT)
    except Exception as error:
        raise error
  • Deployed to Azure App Service.
  • Provide the web app's url as the messaging endpoint by adding /api/messages/ to it.

enter image description here

  • Add the Startup command in Web app=>settings=>Configuration=>Startup command:
python3 -m aiohttp.web -H 0.0.0.0 -P 8000 app:init_fun

enter image description here

Test the bot:

enter image description here

Upvotes: 0

Related Questions