Reputation: 1
I've been trying to resolve this issue for a while now but I can't seem to find a solution.
So as the title suggests, I'm trying to run a long running action in logic apps via Functions. I have a python code that compares 2 excel files and do a little transformation. It's working fine however, the excel file contains almost 20k rows (It's dynamic and more rows will be added in the future) so it takes a while to finish. The actual Python code runs for around 8 minutes but when I convert the same code to Azure Function, it runs about 11 minutes locally. This obviously will hit the timeout limit for Logic apps. While doing some research I found that using HTTP webhook is the easiest and feasible workaround.
Sample blog I found https://medium.com/@jeffhollan/calling-long-running-functions-from-logic-apps-6d7ba5044701
The idea is to create 2 functions. One is something that the webhook will invoke, pass a callbackUri and return an immediate 202 response. The second one will execute the long running action and call the callbackuri once done.
All of the sample implementations were using C# or other language and no python sample. I ended up using threads.
import logging
import azure.functions as func
import time
import requests
import threading
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.function_name("http_trigger1 ")
@app.route("http_trigger1 ")
def http_trigger1 (req: func.HttpRequest) -> func.HttpResponse:
logging.info('Webhook request from Logic Apps received.')
callback_url = req.params.get('callback_url')
data = req.params.get('data')
if not callback_url:
try:
req_body = req.get_json()
except ValueError:
pass
else:
callback_url = req_body.get('callback_url')
data = req_body.get('data')
if not callback_url:
return func.HttpResponse(
"Enter a valid callback URL",
status_code=400
)
try:
# Immediately return 202 Accepted
threading.Thread(target=process_and_callback, args=(callback_url, data)).start()
return func.HttpResponse(
"Accepted for processing",
status_code=202
)
except Exception as e:
logging.error(f'An error occurred: {str(e)}')
return func.HttpResponse(
"Error occurred while invoking callback",
status_code=500
)
def process_and_callback(callback_url: str, data: str) -> None:
try:
time.sleep(720)
callback_data = {
"Subject": data
}
# Make callback request
response = requests.post(callback_url, json=callback_data)
response.raise_for_status()
logging.info(f'Callback successful with status code: {response.status_code}')
except Exception as e:
logging.error(f'An error occurred while invoking callback: {str(e)}')
I have a 12 minute sleep to simulate a long running action. This seemed to work and it did bypass the timeout limit in Logic App. However here's the problem, After I replace the sleep with my actual code:
my request body consists of callbackUri, file1, file2 (both base64 encoded)
I'm hoping someone can help me with this. Happy to share more information if needed.
Edit: I should also note that I tested this using a dummy file (file with fewer rows) and using the approach above worked and executed for only 3 minutes. The problem only occurs when working with the actual file.
Edit2: This a snippet of the actual long running action
# Read the Excel file 'file1'
wb_file1 = pd.read_excel(file1)
column_A = [f"{wb_file1.iloc[i, 7]},{wb_file1.iloc[i, 20]},{wb_file1.iloc[i, 21]}" for i in range(1, len(wb_file1))]
# Read the Excel file 'file2'
wb_file2 = pd.read_excel(file2, sheet_name=sheet_name)
column_a_out = {}
duplicates = []
for i, a in enumerate(column_A):
row_number = wb_file2.index[(wb_file2.iloc[:, 7].astype(str) + ',' +
wb_file2.iloc[:, 20].astype(str) + ',' +
wb_file2.iloc[:, 21].astype(str)) == a].tolist()
if not len(row_number) > 1:
row = int(row_number[0]) + 2
column_a = a
column_f = f"{wb_file1.iloc[i+1, 5]}"
column_ad = f"{wb_file1.iloc[i+1, 29]}"
column_af = f"{wb_file1.iloc[i+1, 31]}"
column_a_file1[i+1] = [row, column_a, column_f, column_ad, column_af]
else:
duplicates.append(a)
# Load workbook into openpyxl for manipulation
wb_file2 = openpyxl.load_workbook(file2)
sheet_file2 = wb_file2[sheet_name]
for items in column_a_out:
row_num, a, f, ad, af = column_a_out[items]
sheet_file2.cell(row=row_num, column=6, value="" if f == 'nan' else f
sheet_file2.cell(row=row_num, column=30, value="" if ad == 'nan' else ad
sheet_file2.cell(row=row_num, column=32, value="" if af == 'nan' else af
Upvotes: 0
Views: 145
Reputation: 977
Here are a couple of suggestions. I'd recommend the second one.
Your two-function approach sounds like you're serialising the Excel files (Base64 in your case) and posting them in the request to the function. I guess the Logic Apps "RequestEntityTooLarge" is caused because you're at the limit of how much you can send. This is a bit of a guess; I don't have any experience of Logic Apps personally.
So instead, write the Excel files somewhere - perhaps simply to a Blob resource. It can then call the long-running function, passing the Blob file names to the function, so it can read them directly from blob. You then avoid the large Base64 strings.
20k rows of data really isn't a lot. If it's taking around 10 minutes to run, I'd suggest there's a lot of scope for optimising your algorithm. This might be easier than building a complex interaction of multiple functions and long-running jobs.
Would you consider expanding on your algorithm so people can suggest how to speed it up?
Upvotes: 0