Reputation: 2129
Recently, I have asked a question regarding how to track the progress of a for loop inside a API deployed. Here's the link.
The solution code that worked for me is,
from fastapi import FastAPI, UploadFile
from typing import List
import asyncio
import uuid
context = {'jobs': {}}
app = FastAPI()
async def do_work(job_key, files=None):
iter_over = files if files else range(100)
for file, file_number in enumerate(iter_over):
jobs = context['jobs']
job_info = jobs[job_key]
job_info['iteration'] = file_number
job_info['status'] = 'inprogress'
await asyncio.sleep(1)
jobs[job_key]['status'] = 'done'
@app.get('/')
async def get_testing():
identifier = str(uuid.uuid4())
context['jobs'][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/status/{identifier}')
async def status(identifier):
return {
"status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
}
This way, I can track the progress of the for loop inside the do_work
using the identifier by calling status method
Now, I am looking for a way to parallelize the for loop inside do_work
method.
But if I use joblib
then I don't know how to track each file being processed, the iteration count will be meaningless because all files will be processed in parallel.
Note: I just gave an example with joblib because I am not very familiar with other libraries. The processing on the file is bit heavy cpu based work. I'm preprocessing file and loading 4 tensorflow models and predict it on the file and writing to sql database.
If anyone knows any methods in which I can do it, please share and help me out.
Upvotes: 9
Views: 5560
Reputation: 18118
If you receive many requests and the processing time is large, parallelizing the work across multiple threads could potentially starve API clients. So make sure you limit the number of threads (or processes/executors - see below) per call to a small number.
You could use pyspark
to distribute the file paths to executors (each runs as a process) that'll do the work, you can have multiple executors per machine and you can distribute across multiple machines.
Another option would be to use a thread pool via concurrent.futures
, use max_workers
to limit the number of threads per request.
and pass a concurrent collection to the thread's at launch, so they can "report" their progress by writing to this collection (you can wrap a regular collection with a lock, since Python doesn't provide spin-lock based concurrent collections).
Upvotes: 0
Reputation: 4476
It seems that the Parallel
function of joblib
is blocking the thread that answers to requests.
A probably better solution is the one of @Ron Serruya, where they managed to not block the main thread.
Here's a potential solution. note that I didn't test, but it should be enough to give you a rough idea. Also, I wasn't 100% sure of what you needed so it definitely needs a review before from your side before running it.
Despite that, I do not understand why you don't use a database for keeping the status of the iteration. This way, you avoid running into concurrency (or rails) issue and can keep the state of the iteration/training even in case of power failure.
from fastapi import FastAPI, UploadFile
from typing import List
import asyncio
import uuid
from joblib import Parallel, delayed
context = {'jobs': {}}
app = FastAPI()
def parallelize(iterate_count):
# Function part that needs to be run in parallel
iterate_count += 1
async def do_work(job_key, files=None):
iter_over = files if files else range(100)
jobs = context['jobs'][job_key]
jobs["iteration"] = 0
jobs['status'] = 'inprogress'
Parallel()(delayed(parallelize)(jobs["iteration"]) for file, file_number in enumerate(iter_over))
jobs['status'] = 'done'
@app.get('/')
async def get_testing():
identifier = str(uuid.uuid4())
context['jobs'][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/status/{identifier}')
async def status(identifier):
return {
"status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
}
Upvotes: 0
Reputation: 4446
Im not 100% sure i understood, would something like this work?
async def do_work(job_key, files=None):
iter_over = files if files else range(100)
jobs = context['jobs']
job_info = jobs[job_key]
job_info['iteration'] = 0
async def do_work_inner(file):
# do the work on the file here
job_info['iteration'] += 1
await asyncio.sleep(0.5)
tasks = [do_work_inner(file) for file in iter_over]
job_info['status'] = 'inprogress'
await asyncio.gather(*tasks)
jobs[job_key]['status'] = 'done'
This will run all of the work on the file in parallel*, keep in mind that in this case, job_info['iteration'] is mostly meaningless since all of them start together they will increase the value together.
Pay attention that this is really important what is the actual kind of work you want to perform on the files, if its a cpu-related job (calculations, analysis etc) as opposed to mainly IO related job like web calls, then this is the wrong solution, and should be tweaked a bit, if so let me know and I'll try to update it.
Edit: updated version for cpu-related work, progress shows files completed
This is a relatively complete example, just without the actual server
import time
import asyncio
import random
from concurrent.futures import ProcessPoolExecutor
jobs = {}
context = {}
executor = ProcessPoolExecutor()
def do_work_per_file(file, file_number):
# CPU related work here, this method is not async
# do the work on the file here
print(f'Starting work on file {file_number}')
time.sleep(random.randint(1,10) / 10)
return file_number
async def do_work(job_key, files=None):
iter_over = files if files else range(15)
jobs = context['jobs']
job_info = jobs[job_key]
job_info['completed'] = 0
loop = asyncio.get_running_loop()
tasks = [loop.run_in_executor(executor,do_work_per_file, file, file_number) for file,file_number in enumerate(iter_over)]
job_info['status'] = 'inprogress'
for completed_job in asyncio.as_completed(tasks):
print(f'Finished work on file {await completed_job}')
job_info['completed'] += 1
print('Current job status is ', job_info)
jobs[job_key]['status'] = 'done'
print('Current job status is ', job_info)
if __name__ == '__main__':
context['jobs'] = jobs
jobs['abc'] = {}
asyncio.run(do_work('abc'))
The output is
Starting work on file 0
Starting work on file 1
Starting work on file 2
Starting work on file 3
Starting work on file 4
Starting work on file 5
Starting work on file 6
Starting work on file 7
Starting work on file 8
Starting work on file 9
Starting work on file 10
Starting work on file 11
Starting work on file 12
Starting work on file 13
Starting work on file 14
Finished work on file 1
Current job status is {'completed': 1, 'status': 'inprogress'}
Finished work on file 7
Current job status is {'completed': 2, 'status': 'inprogress'}
Finished work on file 9
Current job status is {'completed': 3, 'status': 'inprogress'}
Finished work on file 12
Current job status is {'completed': 4, 'status': 'inprogress'}
Finished work on file 11
Current job status is {'completed': 5, 'status': 'inprogress'}
Finished work on file 13
Current job status is {'completed': 6, 'status': 'inprogress'}
Finished work on file 4
Current job status is {'completed': 7, 'status': 'inprogress'}
Finished work on file 14
Current job status is {'completed': 8, 'status': 'inprogress'}
Finished work on file 0
Current job status is {'completed': 9, 'status': 'inprogress'}
Finished work on file 6
Current job status is {'completed': 10, 'status': 'inprogress'}
Finished work on file 2
Current job status is {'completed': 11, 'status': 'inprogress'}
Finished work on file 3
Current job status is {'completed': 12, 'status': 'inprogress'}
Finished work on file 8
Current job status is {'completed': 13, 'status': 'inprogress'}
Finished work on file 5
Current job status is {'completed': 14, 'status': 'inprogress'}
Finished work on file 10
Current job status is {'completed': 15, 'status': 'inprogress'}
Current job status is {'completed': 15, 'status': 'done'}
Basically what changed is now you are opening a new process pool that handles the work on the files, being a new process also means that CPU intensive work will not block your event loop and stop you from querying the status of the job.
Upvotes: 4