Juan Lozano
Juan Lozano

Reputation: 667

ray.init() called, but ray client is already connected error in a cloud run

I have this code running in a Cloud Run container in GCP

import os
import sys
import logging
import json
import time
import ray
import google.protobuf.json_format
from flask import Flask, request, jsonify
from google.events.cloud.firestore import DocumentEventData

# Configure logging
logging.basicConfig(
    stream=sys.stdout,
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def get_or_create_ray_connection():
    """Single function to handle Ray connection"""
    try:
        # Try to disconnect first to ensure clean state
        try:
            ray.disconnect()
            logger.info("Cleaned up any existing Ray connection")
        except:
            pass

        # Now try to connect
        creds_path = "./ray-gcs-sa.json"
        if not os.path.exists(creds_path):
            logger.error(f"❌ Service account key not found at: {creds_path}")
            return False
            
        with open(creds_path, "r") as f:
            credentials_content = f.read()
        
        logger.info(f"🔄 Starting Ray connection at {time.strftime('%Y-%m-%d %H:%M:%S')}")
        worker_id = f"worker_{os.getpid()}_{time.time_ns()}"
        
        ray.init(
            address="ray://xxxxxxxxxxxxxxxx",
            namespace=worker_id,
            runtime_env={
                "pip": ["google-cloud-storage>=2.0.0"],
                "env_vars": {
                    "GOOGLE_APPLICATION_CREDENTIALS_CONTENT": credentials_content
                }
            },
            logging_level=logging.INFO
        )
        
        # Test the connection
        @ray.remote
        def ping():
            return "Connection successful"
        
        result = ray.get(ping.remote(), timeout=5)
        logger.info(f"✨ Ray connection verified: {result}")
        return True

    except Exception as e:
        logger.error(f"❌ Ray connection error: {str(e)}")
        return False

def extract_user_query(doc_json):
    """Extract user query from Firestore document"""
    try:
        doc_dict = json.loads(doc_json)
        value_dict = doc_dict.get('value', {})
        fields = value_dict.get('fields', {})
        user_query = fields.get('user_query', {}).get('stringValue', '')
        logger.info(f"🔍 Extracted user query: {user_query}")
        return user_query
    except Exception as e:
        logger.error(f"❌ Error extracting user query: {e}", exc_info=True)
        raise

app = Flask(__name__)

@app.route("/", methods=["GET"])
def home():
    """Health check endpoint"""
    logger.info("🏥 Health check endpoint called")
    return "Firestore Listener is running!", 200

@app.route("/trigger", methods=["POST"])
def trigger():
    """Receives Firestore events and manages Ray connection"""
    try:
        logger.info("🔹 Received request at /trigger")
        raw_body = request.get_data()
        logger.info(f"📜 Raw Protobuf data length: {len(raw_body)} bytes")
        
        if not raw_body:
            logger.warning("⚠️ Empty request body received")
            return jsonify({"status": "error", "message": "Empty request body"}), 400

        # Parse Protobuf message
        doc_event = DocumentEventData()
        doc_event._pb.ParseFromString(raw_body)
        doc_json = google.protobuf.json_format.MessageToJson(doc_event._pb)
        logger.info(f"✅ Parsed DocumentEventData JSON: {doc_json}")

        # Extract user query
        new_user_query = extract_user_query(doc_json)
        logger.info(f"📝 New user query stored: {new_user_query}")

        # Single connection attempt
        logger.info("🔄 Setting up Ray connection...")
        connection_active = get_or_create_ray_connection()
        
        response_data = {
            "status": "success",
            "message": "Event received and processed",
            "user_query": new_user_query,
            "ray_connected": connection_active
        }

        logger.info(f"✨ Request processed. Ray connection status: {connection_active}")
        return jsonify(response_data), 200

    except Exception as e:
        error_msg = f"❌ Error processing request: {str(e)}"
        logger.error(error_msg, exc_info=True)
        return jsonify({
            "status": "error",
            "message": error_msg
        }), 500
    finally:
        try:
            ray.disconnect()
            logger.info("🔌 Ray connection cleaned up")
        except:
            pass

if __name__ == "__main__":
    port = int(os.environ.get("PORT", 8081))
    logger.info(f"🚀 Starting Flask app on port {port}")
    app.run(host="0.0.0.0", port=port)

Most of the code works but connecting to ray is giving me this error

ERROR - ❌ Ray connection error: ray.init() called, but ray client is already connected"

I want to use that same connection if it is already connected but no matter what I do it keept trying to do ray.init(), how can I use that active connection in Ray

this is my Dockerfile

FROM python:3.9.21

WORKDIR /app

# Install Java (required for PySpark)
RUN apt-get update && \
    apt-get install -y default-jdk && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# Set environment variables
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ENV PATH=$PATH:$JAVA_HOME/bin
ENV PYTHONUNBUFFERED=1
ENV PORT=8081

# Create directory for credentials
RUN mkdir -p ./ray_env_3921/gcp_credentials

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt && \
    pip install "ray[client]==2.7.0"

# Copy service account key first to ensure it exists
COPY ray-gcs-sa.json ./

# Copy application code
COPY . .

# Set proper permissions for service account key
RUN chmod 600 ./ray-gcs-sa.json

# Run with Gunicorn with logging configuration
CMD exec gunicorn \
    --bind :$PORT \
    --workers 1 \
    --threads 8 \
    --log-level debug \
    --access-logfile - \
    --error-logfile - \
    --capture-output \
    main:app

Upvotes: 0

Views: 17

Answers (0)

Related Questions