Reputation: 1
I've built a bot in Python, but it doesn't deploy correctly as a Web App in MS Azure. Here is the .py code I'm using:
import json
import uuid
import asyncio
import aiohttp
import re # For parsing commands more effectively
import shlex # To properly handle splitting with quotes
from dotenv import load_dotenv
from aiohttp import web
from botbuilder.core import BotFrameworkAdapter, BotFrameworkAdapterSettings, TurnContext
from botbuilder.schema import Activity
# Load environment variables from .env file
load_dotenv()
# Accessing variables from environment
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
INGRAM_CLIENT_ID = os.getenv("INGRAM_CLIENT_ID")
INGRAM_CLIENT_SECRET = os.getenv("INGRAM_CLIENT_SECRET")
INGRAM_CUSTOMER_NUMBER = os.getenv("INGRAM_CUSTOMER_NUMBER")
async def get_access_token():
""" Asynchronously obtain an access token from Ingram Micro's OAuth endpoint. """
url = "https://api.ingrammicro.com:443/oauth/oauth30/token"
payload = {
'grant_type': 'client_credentials',
'client_id': INGRAM_CLIENT_ID,
'client_secret': INGRAM_CLIENT_SECRET
}
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, data=payload) as response:
if response.status == 200:
data = await response.json()
# Store the time at which the token will expire
expire_time = asyncio.get_running_loop().time() + int(data['expires_in']) - 300 # Adding buffer to renew before actual expiry
return data['access_token'], expire_time
else:
print(f"Failed to obtain access token: {response.status}, {await response.text()}")
return None, None
def parse_search_query(query):
# Example: search product details for "hard drive", SSD, and "external storage"
keyword_pattern = re.compile(r'search product details for (.+)', re.IGNORECASE)
match = keyword_pattern.search(query)
if match:
search_text = match.group(1)
# Using shlex.split to handle quoted words as single terms
keywords = shlex.split(search_text)
return keywords
return []
def construct_params(keyword, filters=None):
params = {
'pageNumber': 1,
'pageSize': 25,
'type': 'IM::any',
'keyword': keyword,
'includeProductAttributes': 'true',
'includePricing': 'true',
'includeAvailability': 'true'
}
if filters:
params.update(filters)
return params
def format_response(products):
formatted_products = []
for product_data in products:
for product in product_data.get('catalog', []):
links_info = "No direct link available"
if 'links' in product and product['links']:
link = next((link for link in product['links'] if link.get('type') == 'GET'), None)
links_info = link['href'] if link else links_info
description = product.get('description', 'No description available')
category = product.get('category', 'No category')
vendor_name = product.get('vendorName', 'No vendor name')
vendorPartNumber= product.get('vendorPartNumber', 'No vendor Part number')
extraDescription = product.get('extraDescription', 'No Extended Description available')
subCategory = product.get('subCategory', 'No subcategory')
productType = product.get('productType', 'No product type')
formatted_product = f"{vendor_name} - {description} - {category} - {subCategory} - {productType}\nPrice and availability: {links_info}"
formatted_products.append(formatted_product)
return "\n\n".join(formatted_products) # Join all formatted product details with a newline
async def fetch_products(access_token, keywords):
"""Fetch and format products based on multiple keywords."""
results = []
url = 'https://api.ingrammicro.com:443/sandbox/resellers/v6/catalog'
headers = {
'Authorization': f'Bearer {access_token}',
'IM-CustomerNumber': INGRAM_CUSTOMER_NUMBER,
'IM-SenderID': 'MyCompany',
'IM-CorrelationID': str(uuid.uuid4()),
'IM-CountryCode': 'US',
'Accept-Language': 'en',
'Content-Type': 'application/json',
}
async with aiohttp.ClientSession() as session:
for keyword in keywords:
params = {
'pageNumber': 1,
'pageSize': 25,
'type': 'IM::any',
'keyword': keyword.strip(), # Remove any extra whitespace around keywords
'includeProductAttributes': 'true',
'includePricing': 'true',
'includeAvailability': 'true'
}
async with session.get(url, headers=headers, params=params) as response:
if response.status == 200:
data = await response.json()
results.append(data) # Append the response data to the results list
else:
print(f"Failed API Call for keyword '{keyword}': {response.status}, {await response.text()}")
return results # Return the list of results for all keywords
async def get_intent_from_openai(query):
prompt = f"What is the user looking for if they say: '{query}'? Is it a printer or printer supplies?"
response = await ask_openai(prompt)
return 'printer' if 'printer' in response.lower() and 'supply' not in response.lower() else 'cartridge'
async def ask_openai(prompt):
""" Use OpenAI to process general inquiries based on user input using the chat model. """
headers = {"Authorization": f"Bearer {OPENAI_API_KEY}"}
payload = {
"model": "gpt-4-turbo",
"messages": [{"role": "user", "content": prompt}]
}
url = "https://api.openai.com/v1/chat/completions"
async with aiohttp.ClientSession(headers=headers) as session:
async with session.post(url, headers=headers, json=payload) as response:
if response.status == 200:
data = await response.json()
return data['choices'][0]['message']['content'].strip()
else:
print("Failed to process request with OpenAI:", response.status, await response.text())
return "I had an error processing your request. Please try again later."
def format_product_details(product_details):
formatted_products = []
for product in product_details:
ingram_part_number = product.get('ingramPartNumber', 'N/A').upper() # Force uppercase for consistency
description = product.get('description', 'No description available') # Default description
product_status_code = product.get('productStatusCode', 'N/A')
product_status_message = product.get('productStatusMessage', 'No status message available')
# Extracting availability details
availability = product.get('availability', {})
available = availability.get('available', False)
total_availability = availability.get('totalAvailability', 0)
# Extracting pricing details
pricing = product.get('pricing', {})
retail_price = pricing.get('retailPrice', 'N/A')
customer_price = pricing.get('customerPrice', 'N/A')
# Constructing the formatted product string using " | " as a delimiter
formatted_product = (
f"Product Number: {ingram_part_number} \n\n "
f"Product Status Code: {product_status_code} - \n\n {product_status_message} \n\n "
f"Description: {description} \n\n "
f"Availability: {'Available' if available else 'Not Available'} \n\n "
f"Total Availability: {total_availability} \n\n "
f"Retail Price: {retail_price} \n\n "
f"Customer Price: {customer_price}"
)
formatted_products.append(formatted_product)
return "\n\n".join(formatted_products) # Use "\n\n" to separate different products clearly
class EchoBot:
def __init__(self):
self.access_token, self.token_expire_time = asyncio.run(get_access_token())
async def ensure_access_token(self):
""" Ensure a valid access token is always available. """
if not self.access_token or asyncio.get_running_loop().time() > self.token_expire_time:
self.access_token, self.token_expire_time = await get_access_token()
if not self.access_token:
raise Exception("Unable to retrieve a valid token")
async def fetch_price_and_availability(self, ingram_part_number):
url = (f'https://api.ingrammicro.com:443/sandbox/resellers/v6/catalog/priceandavailability'
f'?includePricing=true&includeAvailability=true&includeProductAttributes=true')
headers = {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json',
'IM-CustomerNumber': INGRAM_CUSTOMER_NUMBER,
'IM-CountryCode': 'US',
'IM-CorrelationID': str(uuid.uuid4()),
'IM-SenderID': 'MyCompany',
'Accept': 'application/json'
}
data = json.dumps({"products": [{"ingramPartNumber": ingram_part_number.upper()}]}) # Force uppercase for IngramPartNumber
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, data=data) as response:
if response.status == 200:
product_details = await response.json()
print("DEBUG: Full API Response:", product_details)
return format_product_details(product_details)
else:
error_message = await response.text()
print(f"Failed to fetch details: {response.status} - {error_message}")
return f"Failed to fetch details: {response.status} - {error_message}"
async def on_turn(self, turn_context: TurnContext):
"""Handle incoming messages."""
await self.ensure_access_token()
if turn_context.activity.type == 'message':
user_message = turn_context.activity.text.lower()
keyword_search = re.search(r"search product details for (.+)", user_message)
product_id_search = re.search(r"price and availability for (\w+)", user_message)
if keyword_search:
keywords = keyword_search.group(1).split(',') # Assuming keywords are comma-separated
products_data = await fetch_products(self.access_token, keywords)
response = format_response(products_data)
await turn_context.send_activity(Activity(type="message", text=response))
elif product_id_search:
product_id = product_id_search.group(1)
response = await self.fetch_price_and_availability(product_id)
await turn_context.send_activity(Activity(type="message", text=response))
else:
response = await ask_openai(user_message)
await turn_context.send_activity(Activity(type="message", text=response))
elif turn_context.activity.type == 'conversationUpdate':
if turn_context.activity.members_added:
for member in turn_context.activity.members_added:
if member.id != turn_context.activity.recipient.id:
await turn_context.send_activity(Activity(type="message", text="Welcome to the Ingram Micro Bot! Type 'hello' to start or ask me anything."))
if __name__ == "__main__":
settings = BotFrameworkAdapterSettings(app_id="", app_password="")
adapter = BotFrameworkAdapter(settings)
bot = EchoBot()
app = web.Application()
async def messages(req: web.Request) -> web.Response:
if "application/json" in req.headers.get("Content-Type", ""):
body = await req.json()
activity = Activity.deserialize(body)
auth_header = req.headers.get("Authorization", "")
await adapter.process_activity(activity, auth_header, bot.on_turn)
return web.Response(status=200)
return web.Response(status=415)
app.router.add_post("/api/messages", messages)
web.run_app(app, host='localhost', port=3978)
I deployed the code via VSC Azure extension, but when I try to open the bot I get an Application error message.
Here is where the bot should be hosted: https://apollo001webapp.azurewebsites.net/
Thanks in advance.
Upvotes: 0
Views: 77
Reputation: 8694
Add a init_func()
and configure the host and port in the code in your app.py
.
Sample Code Snippet:
def init_func(argv):
APP = web.Application(middlewares=[aiohttp_error_middleware])
APP.router.add_post("/api/messages", messages)
return APP
if __name__ == "__main__":
APP = init_func(None)
try:
web.run_app(APP, host="0.0.0.0", port=CONFIG.PORT)
except Exception as error:
raise error
Add App_ID
and App_password
in config.py
.
class DefaultConfig:
""" Bot Configuration """
PORT = 3978
APP_ID = os.environ.get("MicrosoftAppId", "49b71438-565b-440b-b549-7e53b9b0dbd4")
APP_PASSWORD = os.environ.get("MicrosoftAppPassword", "2r58Q~K7HT9D7uYZDg0rvQcoJKh4A9EqpHu_tdr7")
I have created a python bot and deployed to Azure.
App.py:
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import sys
import traceback
from datetime import datetime
from aiohttp import web
from aiohttp.web import Request, Response, json_response
from botbuilder.core import (BotFrameworkAdapter, BotFrameworkAdapterSettings,
TurnContext)
from botbuilder.core.integration import aiohttp_error_middleware
from botbuilder.schema import Activity, ActivityTypes
from bot import MyBot
from config import DefaultConfig
CONFIG = DefaultConfig()
# Create adapter.
# See https://aka.ms/about-bot-adapter to learn more about how bots work.
SETTINGS = BotFrameworkAdapterSettings(CONFIG.APP_ID, CONFIG.APP_PASSWORD)
ADAPTER = BotFrameworkAdapter(SETTINGS)
# Catch-all for errors.
async def on_error(context: TurnContext, error: Exception):
# This check writes out errors to console log .vs. app insights.
# NOTE: In production environment, you should consider logging this to Azure
# application insights.
print(f"\n [on_turn_error] unhandled error: {error}", file=sys.stderr)
traceback.print_exc()
# Send a message to the user
await context.send_activity("The bot encountered an error or bug.")
await context.send_activity(
"To continue to run this bot, please fix the bot source code."
)
# Send a trace activity if we're talking to the Bot Framework Emulator
if context.activity.channel_id == "emulator":
# Create a trace activity that contains the error object
trace_activity = Activity(
label="TurnError",
name="on_turn_error Trace",
timestamp=datetime.utcnow(),
type=ActivityTypes.trace,
value=f"{error}",
value_type="https://www.botframework.com/schemas/error",
)
# Send a trace activity, which will be displayed in Bot Framework Emulator
await context.send_activity(trace_activity)
ADAPTER.on_turn_error = on_error
# Create the Bot
BOT = MyBot()
# Listen for incoming requests on /api/messages
async def messages(req: Request) -> Response:
# Main bot message handler.
if "application/json" in req.headers["Content-Type"]:
body = await req.json()
else:
return Response(status=415)
activity = Activity().deserialize(body)
auth_header = req.headers["Authorization"] if "Authorization" in req.headers else ""
response = await ADAPTER.process_activity(activity, auth_header, BOT.on_turn)
if response:
return json_response(data=response.body, status=response.status)
return Response(status=201)
def init_func(argv):
APP = web.Application(middlewares=[aiohttp_error_middleware])
APP.router.add_post("/api/messages", messages)
return APP
if __name__ == "__main__":
APP = init_func(None)
try:
web.run_app(APP, host="0.0.0.0", port=CONFIG.PORT)
except Exception as error:
raise error
/api/messages/
to it.Web app=>settings=>Configuration=>Startup command
:python3 -m aiohttp.web -H 0.0.0.0 -P 8000 app:init_fun
Test the bot:
Upvotes: 0