gpu-try-deepface
gpu-try-deepface

Reputation: 1

How to Parallelize a Flask App with Gunicorn and Distribute GPU Usage Among Workers?

I am building a Flask app to handle facial embeddings using DeepFace. My goal is to serve approximately 50 clients, with an estimated 10 requests per minute. Each request involves running deepface.represent() to process an image, which requires GPU resources.

Here’s my setup:

   `app.py: Main Flask app.
model.py: Loads the DeepFace models and includes the represent function.
database.py: Handles database interactions.
wsgi.py: Entrypoint for the Gunicorn server.
gunicorn.conf: Gunicorn configuration file.`

I want to run the app with 5 Gunicorn workers, each using a different logical GPU. However, I am facing challenges with configuring GPU division:

GPU Logical Devices: The creation of logical GPU devices using TensorFlow (e.g., tf.config.experimental.set_virtual_device_configuration) needs to be connected to individual workers. Placing this configuration in model.py (where the models are loaded) doesn't seem to work, as all workers share the same GPU resources.

Worker Parallelization: Should I use synchronous (sync) or asynchronous (async) workers in Gunicorn for this type of workload? I aim to ensure efficient parallelization without overloading the GPU.

Ray Integration: I’m considering using Ray to manage parallel processing. If Ray is necessary, which parts of the app would you recommend integrating it with?

Key Question:

How can I configure Gunicorn workers to each use a separate logical GPU device, ensuring that the deepface.represent() function runs in parallel without resource contention? Any advice on where the GPU division logic should reside or how to structure the app to achieve this would be greatly appreciated.

below my code:

from app import create_app
import os
# To install boto3, run:
# pip install boto3
import boto3

# Add AWS credentials loading
session = boto3.Session()
# This will automatically load credentials from the default chain
# (environment variables, ~/.aws/credentials, or IAM role)

app = create_app()
app.secret_key = os.urandom(24)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)
class ModelLoader:
    """
    Singleton class for loading and managing face detection and recognition models.
    This class ensures models are loaded only once and shared across the application.
    """
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(ModelLoader, cls).__new__(cls)
            cls._instance._initialize()
        return cls._instance

    def _initialize(self):
        """Initialize model attributes and load models."""
        # Adicionar configuração de memória
        self.config = self.load_config()
        #self.gpu_memory_limit = self.config.get('gpu_memory_limit', 1024)  # Valor padrão 1GB
        
        # Create or open GPU log file
        self.gpu_log_file = "gpu.log"
        with open(self.gpu_log_file, "a") as f:
            f.write("\n--- New Session Started ---\n")
        
        self.facenet512_model = None
        self.retinaface_model = None
        self.mtcnn = None
        self.opencv = None
        self.load_models()

    def load_config(self):
        """Load configurations from config.json file."""
        try:
            with open("config.json", "r") as file:
                config = json.load(file)
            return config
        except FileNotFoundError:
            # Retornar configuração padrão se arquivo não existir
            return {
                "gpu_memory_limit": 1024,  # 1GB
                "allow_growth": True
            }

    def get_model(self, model_name):
        """
        Get a specific model by name.
        
        Args:
            model_name (str): Name of the model to retrieve.
        
        Returns:
            The requested model instance.
            
        Raises:
            ValueError: If the requested model is not found.
        """
        if model_name == "Facenet512":
            return self.facenet512_model
        else:
            raise ValueError(f"Model {model_name} not found.")
            
    def load_models(self):
        """Load all required models if they haven't been loaded already."""
        if self.facenet512_model is None:
            start_time = time.time()
            logger.info("Loading Facenet512 model...")
            
            try:
                self.facenet512_model = DeepFace.build_model("Facenet512")
                self.retinaface_model = modeling.build_model(task="face_detector", model_name="retinaface")
                self.mtcnn = modeling.build_model(task="face_detector", model_name="mtcnn")
                self.opencv = modeling.build_model(task="face_detector", model_name="opencv")
                
                end_time = time.time()
                logger.info(f"Facenet512 model loaded in {end_time - start_time:.2f} seconds.")
            except Exception as e:
                logger.error(f"Error loading models: {str(e)}")
                raise

    def clear_memory(self):
        """Clear memory for TensorFlow only (no GPU operations)."""
        try:
            
            tf.keras.backend.clear_session()
        except ImportError:
            pass

    def _log_resource_usage(self, operation: str):
        """Log GPU and CPU memory usage to gpu.log file."""
        try:
            # Get CPU usage
            cpu_percent = psutil.cpu_percent()
            ram_percent = psutil.virtual_memory().percent
            
            # Get GPU usage if available
            gpu_info = ""
            gpus = GPUtil.getGPUs()
            for gpu in gpus:
                gpu_info += f"GPU {gpu.id}: Memory Use {gpu.memoryUsed}MB/{gpu.memoryTotal}MB ({gpu.memoryUtil*100:.1f}%) "
            
            # Create log entry
            timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
            log_entry = f"[{timestamp}] {operation} - CPU: {cpu_percent}%, RAM: {ram_percent}%, {gpu_info}\n"
            
            # Write to log file
            with open(self.gpu_log_file, "a") as f:
                f.write(log_entry)
                
        except Exception as e:
            logger.error(f"Error logging resource usage: {str(e)}")

    def _is_frontal_face(self, facial_area: Dict[str, Any], threshold: float = 0.1) -> bool:
        """
        Check if the face is frontal based on eye positions and face area.
        
        Args:
            facial_area: Dictionary containing face and eye coordinates
            threshold: Tolerance threshold for determining if face is frontal
            
        Returns:
            bool: True if face is frontal, False otherwise
        """
        try:
            left_eye = facial_area["left_eye"]
            right_eye = facial_area["right_eye"]
            face_x, face_y, face_w, face_h = facial_area["x"], facial_area["y"], facial_area["w"], facial_area["h"]

            # Calculate horizontal distance between eyes
            eye_distance = np.linalg.norm(np.array(left_eye) - np.array(right_eye))
            
            # Calculate ratio between eye distance and face width
            eye_face_width_ratio = eye_distance / face_w
            
            # Calculate vertical eye symmetry
            eye_height_difference = abs(left_eye[1] - right_eye[1]) / face_h
            
            return eye_height_difference < threshold and 0.3 < eye_face_width_ratio < 0.6
            
        except Exception as e:
            logger.error(f"Error checking frontal face: {str(e)}")
            return False

    def get_embeddings(
        self,
        img_path: Union[str, np.ndarray],
        model_name: str = "Facenet512",
        enforce_detection: bool = False,
        detector_backend: str = "mtcnn",
        align: bool = True,
        expand_percentage: int = 0,
        normalization: str = "Facenet",
        anti_spoofing: bool = False,
        max_faces: Optional[int] = 1,
    ) -> Union[List[Dict[str, Any]], Dict[str, int]]:
        """
        Get facial embeddings using the specified model.
        Returns resultado: 4 if face is not frontal.
        """
        try:
            start_time = time.time()
            self._log_resource_usage("Before embeddings generation")
            
            embeddings = DeepFace.represent(
                img_path=img_path,
                model_name=model_name,
                enforce_detection=enforce_detection,
                detector_backend=detector_backend,
                align=align,
                expand_percentage=expand_percentage,
                normalization=normalization,
                anti_spoofing=anti_spoofing,
                max_faces=1,
            )
            
            if not embeddings:
                logger.warning("No faces detected")
                return {"resultado": 4}
            
            # Check if face is frontal
            if not self._is_frontal_face(embeddings[0]["facial_area"]):
                logger.warning("Face is not frontal")
                return {"resultado": 4}
            
            end_time = time.time()
            self._log_resource_usage("After embeddings generation")
            
            # Store result before cleanup
            result = embeddings.copy()
            self.clear_memory()
            return result
            
        except Exception as e:
            logger.error(f"Error getting embeddings: {str(e)}")
            raise

    def extract_faces(
        self,
        img_path: Union[str, np.ndarray],
        detector_backend: str = "mtcnn",
        enforce_detection: bool = True,
        align: bool = True,
        expand_percentage: int = 30,
        grayscale: bool = False,
        color_face: str = "rgb",
        normalize_face: bool = True,
        anti_spoofing: bool = False,
    ) -> List[Dict[str, Any]]:
        """
        Extract faces from the given image.
        See DeepFace.extract_faces() for detailed documentation.
        """
        try:
            self._log_resource_usage("Before face extraction")
            
            faces = DeepFace.extract_faces(
                img_path=img_path,
                detector_backend=detector_backend,
                enforce_detection=enforce_detection,
                align=align,
                expand_percentage=expand_percentage,
                grayscale=grayscale,
                color_face=color_face,
                normalize_face=normalize_face,
                anti_spoofing=anti_spoofing,
            )
            
            self._log_resource_usage("After face extraction")
            # Store result before cleanup
            result = faces.copy()  # Make a copy to be safe
            self.clear_memory()
            return result
        except Exception as e:
            logger.error(f"Error extracting faces: {str(e)}")
            raise

    def get_first_face_embedding(
        self,
        img_path: Union[str, np.ndarray],
        model_name: str = "Facenet512", 
        detector_backend: str = "retinaface",
        enforce_detection: bool = False,
        align: bool = True,
        expand_percentage: int = 0,
        normalization: str = "Facenet",
    ) -> Optional[np.ndarray]:
        """
        Extract the first face from an image and return its embedding.

        This method takes an image input and performs the following steps:
        1. Detects faces in the image using the specified detector backend
        2. Takes the first detected face
        3. Generates an embedding vector for that face using the specified model

        Args:
            img_path: Either a string path to an image file or a numpy array containing the image data
            model_name: Name of the face embedding model to use (default: Facenet512)
            detector_backend: Which face detection model to use (default: retinaface)
            enforce_detection: Whether to raise an error if no face is found (default: True)
            align: Whether to align the detected face before generating embedding (default: True)
            expand_percentage: How much to expand the detected face region by (default: 0)
            normalization: Type of normalization to apply to face pixels (default: Facenet)

        Returns:
            Optional[np.ndarray]: A numpy array containing the face embedding vector if a face is found,
                                or None if no face is detected and enforce_detection is False

        Raises:
            Exception: If no face is detected and enforce_detection is True, or if there are other errors
                      during face detection or embedding generation
     
        """
        try:
            # First extract faces
            faces = self.extract_faces(
                img_path=img_path,
                detector_backend=detector_backend,
                enforce_detection=enforce_detection,
                align=align,
                expand_percentage=expand_percentage,
            )
            
            if not faces:
                logger.warning("No faces detected in image")
                return None
                
            # Get embedding for the first face
            first_face = faces[0]
            embedding = self.get_embeddings(
                img_path=first_face['face'],  # Use the extracted face image
                model_name=model_name,
                enforce_detection=False,  # Face already detected
                detector_backend=detector_backend,
                align=False,  # Face already aligned
                normalization=normalization,
            )
            
            
            
            return embedding[0]['embedding']  # Return just the embedding array
            self.clear_memory()
        except Exception as e:
            logger.error(f"Error getting first face embedding: {str(e)}")
            raise

    def is_face_frontal(self, img_path: Union[str, np.ndarray], threshold_angle: float = 30.0) -> int:
        """
        Check if the largest detected face is frontal.
        
        Args:
            img_path: Image path or numpy array
            threshold_angle: Maximum allowed head rotation angle in degrees (default: 30.0)
        
        Returns:
            int: 0 if the face is frontal, 1 if not frontal or no face detected
        """
        try:
            # If input is a string (file path), load the image
            if isinstance(img_path, str):
                img = cv2.imread(img_path)
                if img is None:
                    logger.error("Failed to load image")
                    return 1
            else:
                img = img_path

            mtcnn_detector = mtcnn()
            faces = mtcnn_detector.detect_faces(img)
            
            if not faces:
                logger.warning("No faces detected")
                return 1
                
            # Get the largest face based on area
            largest_face = max(faces, key=lambda x: x.w * x.h)
            
            # Calculate the angle between eyes
            left_eye = largest_face.left_eye
            right_eye = largest_face.right_eye
            
            # Calculate angle
            dx = right_eye[0] - left_eye[0]
            dy = right_eye[1] - left_eye[1]
            angle = abs(np.degrees(np.arctan2(dy, dx)))
            
            # Check if face is frontal based on angle
            is_frontal = angle < threshold_angle
            
            return 0 if is_frontal else 1
            
        except Exception as e:
            logger.error(f"Error checking face orientation: {str(e)}")
            return 1
from flask import Flask
from routes import register_routes
import logging
from database import DatabaseLoader
from model import ModelLoader
import os
import boto3

def create_app():
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.StreamHandler(),
            logging.FileHandler('app.log', mode='a')
        ]
    )

    # Configure AWS credentials
    aws_access_key = os.environ.get('AWS_ACCESS_KEY_ID')
    aws_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
    aws_region = os.environ.get('AWS_REGION', 'us-east-1')

    # Initialize AWS session
    if aws_access_key and aws_secret_key:
        boto3.setup_default_session(
            aws_access_key_id=aws_access_key,
            aws_secret_access_key=aws_secret_key,
            region_name=aws_region
        )
    
    app = Flask(__name__)
    
    try:
        # Initialize singletons
        DatabaseLoader()  # Initialize database connection
        ModelLoader()    # Load ML models
    except Exception as e:
        logging.error(f"Failed to initialize application components: {str(e)}")
        raise
    
    # Register routes
    register_routes(app)
    
    return app

# Add this for Gunicorn
application = create_app()

# Add this to run the app directly
if __name__ == '__main__':
    application.run(host='0.0.0.0', port=5000, debug=True)

Upvotes: 0

Views: 74

Answers (1)

myan
myan

Reputation: 11

Regarding the Ray integration question, I would think Ray Serve can be something suitable for the use case to serve online requests in parallel and with some computation. The library is a general framework to set up multiple replicas for logic to handle incoming requests and can be scaled up to run across a Ray cluster.

In addition, Ray Serve supports Resource Allocation. With that, you should be able to specify necessary GPU device for each replica.

Upvotes: 1

Related Questions