Reputation: 667
I have this code running in a Cloud Run container in GCP
import os
import sys
import logging
import json
import time
import ray
import google.protobuf.json_format
from flask import Flask, request, jsonify
from google.events.cloud.firestore import DocumentEventData
# Configure logging
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def get_or_create_ray_connection():
"""Single function to handle Ray connection"""
try:
# Try to disconnect first to ensure clean state
try:
ray.disconnect()
logger.info("Cleaned up any existing Ray connection")
except:
pass
# Now try to connect
creds_path = "./ray-gcs-sa.json"
if not os.path.exists(creds_path):
logger.error(f"❌ Service account key not found at: {creds_path}")
return False
with open(creds_path, "r") as f:
credentials_content = f.read()
logger.info(f"🔄 Starting Ray connection at {time.strftime('%Y-%m-%d %H:%M:%S')}")
worker_id = f"worker_{os.getpid()}_{time.time_ns()}"
ray.init(
address="ray://xxxxxxxxxxxxxxxx",
namespace=worker_id,
runtime_env={
"pip": ["google-cloud-storage>=2.0.0"],
"env_vars": {
"GOOGLE_APPLICATION_CREDENTIALS_CONTENT": credentials_content
}
},
logging_level=logging.INFO
)
# Test the connection
@ray.remote
def ping():
return "Connection successful"
result = ray.get(ping.remote(), timeout=5)
logger.info(f"✨ Ray connection verified: {result}")
return True
except Exception as e:
logger.error(f"❌ Ray connection error: {str(e)}")
return False
def extract_user_query(doc_json):
"""Extract user query from Firestore document"""
try:
doc_dict = json.loads(doc_json)
value_dict = doc_dict.get('value', {})
fields = value_dict.get('fields', {})
user_query = fields.get('user_query', {}).get('stringValue', '')
logger.info(f"🔍 Extracted user query: {user_query}")
return user_query
except Exception as e:
logger.error(f"❌ Error extracting user query: {e}", exc_info=True)
raise
app = Flask(__name__)
@app.route("/", methods=["GET"])
def home():
"""Health check endpoint"""
logger.info("🏥 Health check endpoint called")
return "Firestore Listener is running!", 200
@app.route("/trigger", methods=["POST"])
def trigger():
"""Receives Firestore events and manages Ray connection"""
try:
logger.info("🔹 Received request at /trigger")
raw_body = request.get_data()
logger.info(f"📜 Raw Protobuf data length: {len(raw_body)} bytes")
if not raw_body:
logger.warning("⚠️ Empty request body received")
return jsonify({"status": "error", "message": "Empty request body"}), 400
# Parse Protobuf message
doc_event = DocumentEventData()
doc_event._pb.ParseFromString(raw_body)
doc_json = google.protobuf.json_format.MessageToJson(doc_event._pb)
logger.info(f"✅ Parsed DocumentEventData JSON: {doc_json}")
# Extract user query
new_user_query = extract_user_query(doc_json)
logger.info(f"📝 New user query stored: {new_user_query}")
# Single connection attempt
logger.info("🔄 Setting up Ray connection...")
connection_active = get_or_create_ray_connection()
response_data = {
"status": "success",
"message": "Event received and processed",
"user_query": new_user_query,
"ray_connected": connection_active
}
logger.info(f"✨ Request processed. Ray connection status: {connection_active}")
return jsonify(response_data), 200
except Exception as e:
error_msg = f"❌ Error processing request: {str(e)}"
logger.error(error_msg, exc_info=True)
return jsonify({
"status": "error",
"message": error_msg
}), 500
finally:
try:
ray.disconnect()
logger.info("🔌 Ray connection cleaned up")
except:
pass
if __name__ == "__main__":
port = int(os.environ.get("PORT", 8081))
logger.info(f"🚀 Starting Flask app on port {port}")
app.run(host="0.0.0.0", port=port)
Most of the code works but connecting to ray is giving me this error
ERROR - ❌ Ray connection error: ray.init() called, but ray client is already connected"
I want to use that same connection if it is already connected but no matter what I do it keept trying to do ray.init(), how can I use that active connection in Ray
this is my Dockerfile
FROM python:3.9.21
WORKDIR /app
# Install Java (required for PySpark)
RUN apt-get update && \
apt-get install -y default-jdk && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Set environment variables
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ENV PATH=$PATH:$JAVA_HOME/bin
ENV PYTHONUNBUFFERED=1
ENV PORT=8081
# Create directory for credentials
RUN mkdir -p ./ray_env_3921/gcp_credentials
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt && \
pip install "ray[client]==2.7.0"
# Copy service account key first to ensure it exists
COPY ray-gcs-sa.json ./
# Copy application code
COPY . .
# Set proper permissions for service account key
RUN chmod 600 ./ray-gcs-sa.json
# Run with Gunicorn with logging configuration
CMD exec gunicorn \
--bind :$PORT \
--workers 1 \
--threads 8 \
--log-level debug \
--access-logfile - \
--error-logfile - \
--capture-output \
main:app
Upvotes: 0
Views: 17