Reputation: 1
I am building a Flask app to handle facial embeddings using DeepFace. My goal is to serve approximately 50 clients, with an estimated 10 requests per minute. Each request involves running deepface.represent() to process an image, which requires GPU resources.
Here’s my setup:
`app.py: Main Flask app.
model.py: Loads the DeepFace models and includes the represent function.
database.py: Handles database interactions.
wsgi.py: Entrypoint for the Gunicorn server.
gunicorn.conf: Gunicorn configuration file.`
I want to run the app with 5 Gunicorn workers, each using a different logical GPU. However, I am facing challenges with configuring GPU division:
GPU Logical Devices: The creation of logical GPU devices using TensorFlow (e.g., tf.config.experimental.set_virtual_device_configuration) needs to be connected to individual workers. Placing this configuration in model.py (where the models are loaded) doesn't seem to work, as all workers share the same GPU resources.
Worker Parallelization: Should I use synchronous (sync) or asynchronous (async) workers in Gunicorn for this type of workload? I aim to ensure efficient parallelization without overloading the GPU.
Ray Integration: I’m considering using Ray to manage parallel processing. If Ray is necessary, which parts of the app would you recommend integrating it with?
Key Question:
How can I configure Gunicorn workers to each use a separate logical GPU device, ensuring that the deepface.represent() function runs in parallel without resource contention? Any advice on where the GPU division logic should reside or how to structure the app to achieve this would be greatly appreciated.
below my code:
from app import create_app
import os
# To install boto3, run:
# pip install boto3
import boto3
# Add AWS credentials loading
session = boto3.Session()
# This will automatically load credentials from the default chain
# (environment variables, ~/.aws/credentials, or IAM role)
app = create_app()
app.secret_key = os.urandom(24)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
class ModelLoader:
"""
Singleton class for loading and managing face detection and recognition models.
This class ensures models are loaded only once and shared across the application.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(ModelLoader, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self):
"""Initialize model attributes and load models."""
# Adicionar configuração de memória
self.config = self.load_config()
#self.gpu_memory_limit = self.config.get('gpu_memory_limit', 1024) # Valor padrão 1GB
# Create or open GPU log file
self.gpu_log_file = "gpu.log"
with open(self.gpu_log_file, "a") as f:
f.write("\n--- New Session Started ---\n")
self.facenet512_model = None
self.retinaface_model = None
self.mtcnn = None
self.opencv = None
self.load_models()
def load_config(self):
"""Load configurations from config.json file."""
try:
with open("config.json", "r") as file:
config = json.load(file)
return config
except FileNotFoundError:
# Retornar configuração padrão se arquivo não existir
return {
"gpu_memory_limit": 1024, # 1GB
"allow_growth": True
}
def get_model(self, model_name):
"""
Get a specific model by name.
Args:
model_name (str): Name of the model to retrieve.
Returns:
The requested model instance.
Raises:
ValueError: If the requested model is not found.
"""
if model_name == "Facenet512":
return self.facenet512_model
else:
raise ValueError(f"Model {model_name} not found.")
def load_models(self):
"""Load all required models if they haven't been loaded already."""
if self.facenet512_model is None:
start_time = time.time()
logger.info("Loading Facenet512 model...")
try:
self.facenet512_model = DeepFace.build_model("Facenet512")
self.retinaface_model = modeling.build_model(task="face_detector", model_name="retinaface")
self.mtcnn = modeling.build_model(task="face_detector", model_name="mtcnn")
self.opencv = modeling.build_model(task="face_detector", model_name="opencv")
end_time = time.time()
logger.info(f"Facenet512 model loaded in {end_time - start_time:.2f} seconds.")
except Exception as e:
logger.error(f"Error loading models: {str(e)}")
raise
def clear_memory(self):
"""Clear memory for TensorFlow only (no GPU operations)."""
try:
tf.keras.backend.clear_session()
except ImportError:
pass
def _log_resource_usage(self, operation: str):
"""Log GPU and CPU memory usage to gpu.log file."""
try:
# Get CPU usage
cpu_percent = psutil.cpu_percent()
ram_percent = psutil.virtual_memory().percent
# Get GPU usage if available
gpu_info = ""
gpus = GPUtil.getGPUs()
for gpu in gpus:
gpu_info += f"GPU {gpu.id}: Memory Use {gpu.memoryUsed}MB/{gpu.memoryTotal}MB ({gpu.memoryUtil*100:.1f}%) "
# Create log entry
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {operation} - CPU: {cpu_percent}%, RAM: {ram_percent}%, {gpu_info}\n"
# Write to log file
with open(self.gpu_log_file, "a") as f:
f.write(log_entry)
except Exception as e:
logger.error(f"Error logging resource usage: {str(e)}")
def _is_frontal_face(self, facial_area: Dict[str, Any], threshold: float = 0.1) -> bool:
"""
Check if the face is frontal based on eye positions and face area.
Args:
facial_area: Dictionary containing face and eye coordinates
threshold: Tolerance threshold for determining if face is frontal
Returns:
bool: True if face is frontal, False otherwise
"""
try:
left_eye = facial_area["left_eye"]
right_eye = facial_area["right_eye"]
face_x, face_y, face_w, face_h = facial_area["x"], facial_area["y"], facial_area["w"], facial_area["h"]
# Calculate horizontal distance between eyes
eye_distance = np.linalg.norm(np.array(left_eye) - np.array(right_eye))
# Calculate ratio between eye distance and face width
eye_face_width_ratio = eye_distance / face_w
# Calculate vertical eye symmetry
eye_height_difference = abs(left_eye[1] - right_eye[1]) / face_h
return eye_height_difference < threshold and 0.3 < eye_face_width_ratio < 0.6
except Exception as e:
logger.error(f"Error checking frontal face: {str(e)}")
return False
def get_embeddings(
self,
img_path: Union[str, np.ndarray],
model_name: str = "Facenet512",
enforce_detection: bool = False,
detector_backend: str = "mtcnn",
align: bool = True,
expand_percentage: int = 0,
normalization: str = "Facenet",
anti_spoofing: bool = False,
max_faces: Optional[int] = 1,
) -> Union[List[Dict[str, Any]], Dict[str, int]]:
"""
Get facial embeddings using the specified model.
Returns resultado: 4 if face is not frontal.
"""
try:
start_time = time.time()
self._log_resource_usage("Before embeddings generation")
embeddings = DeepFace.represent(
img_path=img_path,
model_name=model_name,
enforce_detection=enforce_detection,
detector_backend=detector_backend,
align=align,
expand_percentage=expand_percentage,
normalization=normalization,
anti_spoofing=anti_spoofing,
max_faces=1,
)
if not embeddings:
logger.warning("No faces detected")
return {"resultado": 4}
# Check if face is frontal
if not self._is_frontal_face(embeddings[0]["facial_area"]):
logger.warning("Face is not frontal")
return {"resultado": 4}
end_time = time.time()
self._log_resource_usage("After embeddings generation")
# Store result before cleanup
result = embeddings.copy()
self.clear_memory()
return result
except Exception as e:
logger.error(f"Error getting embeddings: {str(e)}")
raise
def extract_faces(
self,
img_path: Union[str, np.ndarray],
detector_backend: str = "mtcnn",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 30,
grayscale: bool = False,
color_face: str = "rgb",
normalize_face: bool = True,
anti_spoofing: bool = False,
) -> List[Dict[str, Any]]:
"""
Extract faces from the given image.
See DeepFace.extract_faces() for detailed documentation.
"""
try:
self._log_resource_usage("Before face extraction")
faces = DeepFace.extract_faces(
img_path=img_path,
detector_backend=detector_backend,
enforce_detection=enforce_detection,
align=align,
expand_percentage=expand_percentage,
grayscale=grayscale,
color_face=color_face,
normalize_face=normalize_face,
anti_spoofing=anti_spoofing,
)
self._log_resource_usage("After face extraction")
# Store result before cleanup
result = faces.copy() # Make a copy to be safe
self.clear_memory()
return result
except Exception as e:
logger.error(f"Error extracting faces: {str(e)}")
raise
def get_first_face_embedding(
self,
img_path: Union[str, np.ndarray],
model_name: str = "Facenet512",
detector_backend: str = "retinaface",
enforce_detection: bool = False,
align: bool = True,
expand_percentage: int = 0,
normalization: str = "Facenet",
) -> Optional[np.ndarray]:
"""
Extract the first face from an image and return its embedding.
This method takes an image input and performs the following steps:
1. Detects faces in the image using the specified detector backend
2. Takes the first detected face
3. Generates an embedding vector for that face using the specified model
Args:
img_path: Either a string path to an image file or a numpy array containing the image data
model_name: Name of the face embedding model to use (default: Facenet512)
detector_backend: Which face detection model to use (default: retinaface)
enforce_detection: Whether to raise an error if no face is found (default: True)
align: Whether to align the detected face before generating embedding (default: True)
expand_percentage: How much to expand the detected face region by (default: 0)
normalization: Type of normalization to apply to face pixels (default: Facenet)
Returns:
Optional[np.ndarray]: A numpy array containing the face embedding vector if a face is found,
or None if no face is detected and enforce_detection is False
Raises:
Exception: If no face is detected and enforce_detection is True, or if there are other errors
during face detection or embedding generation
"""
try:
# First extract faces
faces = self.extract_faces(
img_path=img_path,
detector_backend=detector_backend,
enforce_detection=enforce_detection,
align=align,
expand_percentage=expand_percentage,
)
if not faces:
logger.warning("No faces detected in image")
return None
# Get embedding for the first face
first_face = faces[0]
embedding = self.get_embeddings(
img_path=first_face['face'], # Use the extracted face image
model_name=model_name,
enforce_detection=False, # Face already detected
detector_backend=detector_backend,
align=False, # Face already aligned
normalization=normalization,
)
return embedding[0]['embedding'] # Return just the embedding array
self.clear_memory()
except Exception as e:
logger.error(f"Error getting first face embedding: {str(e)}")
raise
def is_face_frontal(self, img_path: Union[str, np.ndarray], threshold_angle: float = 30.0) -> int:
"""
Check if the largest detected face is frontal.
Args:
img_path: Image path or numpy array
threshold_angle: Maximum allowed head rotation angle in degrees (default: 30.0)
Returns:
int: 0 if the face is frontal, 1 if not frontal or no face detected
"""
try:
# If input is a string (file path), load the image
if isinstance(img_path, str):
img = cv2.imread(img_path)
if img is None:
logger.error("Failed to load image")
return 1
else:
img = img_path
mtcnn_detector = mtcnn()
faces = mtcnn_detector.detect_faces(img)
if not faces:
logger.warning("No faces detected")
return 1
# Get the largest face based on area
largest_face = max(faces, key=lambda x: x.w * x.h)
# Calculate the angle between eyes
left_eye = largest_face.left_eye
right_eye = largest_face.right_eye
# Calculate angle
dx = right_eye[0] - left_eye[0]
dy = right_eye[1] - left_eye[1]
angle = abs(np.degrees(np.arctan2(dy, dx)))
# Check if face is frontal based on angle
is_frontal = angle < threshold_angle
return 0 if is_frontal else 1
except Exception as e:
logger.error(f"Error checking face orientation: {str(e)}")
return 1
from flask import Flask
from routes import register_routes
import logging
from database import DatabaseLoader
from model import ModelLoader
import os
import boto3
def create_app():
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log', mode='a')
]
)
# Configure AWS credentials
aws_access_key = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
aws_region = os.environ.get('AWS_REGION', 'us-east-1')
# Initialize AWS session
if aws_access_key and aws_secret_key:
boto3.setup_default_session(
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=aws_region
)
app = Flask(__name__)
try:
# Initialize singletons
DatabaseLoader() # Initialize database connection
ModelLoader() # Load ML models
except Exception as e:
logging.error(f"Failed to initialize application components: {str(e)}")
raise
# Register routes
register_routes(app)
return app
# Add this for Gunicorn
application = create_app()
# Add this to run the app directly
if __name__ == '__main__':
application.run(host='0.0.0.0', port=5000, debug=True)
Upvotes: 0
Views: 74
Reputation: 11
Regarding the Ray integration question, I would think Ray Serve can be something suitable for the use case to serve online requests in parallel and with some computation. The library is a general framework to set up multiple replicas for logic to handle incoming requests and can be scaled up to run across a Ray cluster.
In addition, Ray Serve supports Resource Allocation. With that, you should be able to specify necessary GPU device for each replica.
Upvotes: 1