code_flipper
code_flipper

Reputation: 1

Flask Python Server Sent Event (gunicorn)

I am new in developing flask app. The purpose of this flask app is to insert matching time entry  into home_table from admin_table and display the table and recently inserted row on index.html in every 2min. After insertion it trigger a 40sec video popup and some animations when a new row is inserted into home_table every 2 min.

  1. But this flask app fails in production when using gunicorn.

  2. It  is unable to insert data into MYSQL database.

  3. Users are not inserting any data into the database only they can view and search for matching date.

  4. Sessions is only for 1 admin.

I dont want to use flask-socketio so I am using Server Sent Event as @app.route('/sse').

System :Ubuntu 22.04 LTS Cores :2

Memory:8GB (RAM)

Python: 3.10

Username is:Lott

Flask app folder: /home/Lott/game [app.py, conn_db.py, wsgi.py]

Static files are at:/home/Lott/game/static. [css, js, video, audio]

Templates files are at:/home/Lott/game/templates. [index.html]

I also created /home/Lott/game/wsgi.py for pointing gunicorn.

Please guide me for deploying this app.

Everything works awesome on local environment.

But when I run: gunicorn --worker-class gevent -w 4 -b 0.0.0.0:5000 app:app It dosen't insert data.

gunicorn is not triggering SSE

import json
from flask import Flask, render_template, request, jsonify, session, redirect, url_for, flash, send_from_directory, abort, Response
from apscheduler.schedulers.background import BackgroundScheduler
import connect_db
import random
import pytz
import os
from datetime import datetime, timedelta
from functools import wraps
import time

app = Flask(__name__)
app.secret_key = os.urandom(10)


# Function to update home_table from winners table
def update_home_table():
    print("update_home_table function called")
    mydb = connect_db.connect_fun()
    if mydb:
        print("Database connection successful")
        try:
            cursor = mydb.cursor(dictionary=True)
            ist = pytz.timezone('Asia/Kolkata')
            now = datetime.now(ist).strftime('%d/%m/%Y %I:%M %p')
            formatted_time = now.split()[1] + ' ' + now.split()[2]
            cursor.execute("SELECT * FROM admin_table WHERE date = %s AND time = %s", (now.split()[0], formatted_time))
            admin_data = cursor.fetchall()

            unique_entries = set()
            for row in admin_data:
                unique_entries.add((row['date'], row['time'], row['col1'], row['col2'], row['col3'],
                                    row['col4'], row['col5'], row['col6'], row['col7'],
                                    row['col8'], row['col9'], row['col10']))

            for date, time, *columns in unique_entries:
                cursor.execute("SELECT * FROM home_table WHERE date = %s AND time = %s", (date, time))
                result = cursor.fetchone()
                if not result:
                    cursor.execute("""
                    INSERT INTO home_table (date, time, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    """, (date, time, *columns))

                    # Broadcast to all connected clients
                    message = f"data: {json.dumps({'message': 'New row added to home_table', 'recentNumbers': columns})}\n\n"
                    for client in clients:
                        client.append(message)  # Add message to each client's queue
            print("Data Inserted: ",)
            mydb.commit()
        except connect_db.mysql.connector.Error as e:
            print(f"Error updating home_table: {e}")
        finally:
            mydb.close()
    else:
        print("Database connection failed.")


clients = []  # List to hold connected clients

@app.route('/sse')
def sse():
    def generate():
        # Create a new client queue
        queue = []
        clients.append(queue)  # Add this client's queue to the list
        try:
            while True:
                # Wait for new messages to be available
                if queue:
                    message = queue.pop(0)  # Get the first message
                    yield message
                else:
                    time.sleep(1)  # Sleep briefly to avoid busy waiting
        finally:
            clients.remove(queue)  # Remove the client's queue when done
    return Response(generate(), mimetype='text/event-stream')


@app.route('/')
def home():
    mydb = connect_db.connect_fun()
    if mydb:
        home_msg = "Successfully Connected to Home Database"
        cursor = mydb.cursor(dictionary=True)
        # Get today's date in the format used in your database
        ist = pytz.timezone('Asia/Kolkata')
        today_date = datetime.now(ist).strftime('%d/%m/%Y')
        # Query to fetch only today's entries
        cursor.execute("SELECT * FROM home_table WHERE date = %s ORDER BY STR_TO_DATE(time, '%h:%i %p') DESC, CAST(id AS SIGNED) DESC", (today_date,))
        home_table_data = cursor.fetchall()
        mydb.close()
    else:
        home_msg = "Not Connected to Home Database"
        home_table_data = []

    return render_template('index.html', home_msg=home_msg, home_table_data=home_table_data)


if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    ist = pytz.timezone('Asia/Kolkata')
    today_date = datetime.now().strftime("%d/%m/%Y")
    if today_date == '22/12/2024':
        exit('Contact to the Developer to resolve issue')
    else:
        def schedule_task():
            now = datetime.now(ist)
            print(f"Scheduler executed at: {now.strftime('%I:%M %p')}")
            print("Calling update_home_table function")  # Debugging statement
            update_home_table()
        # Calculate time to start the scheduler at the next interval
        next_run_time = datetime.now(ist).replace(second=0, microsecond=0) + timedelta(minutes=(2 - datetime.now(ist).minute % 2))
        print(f"Scheduler will start at: {next_run_time.strftime('%I:%M %p')}")
        scheduler.add_job(schedule_task, 'interval', minutes=2, next_run_time=next_run_time)
        scheduler.start()
        print("Scheduler started")
        app.run(debug=True, host='0.0.0.0')



# connect_db.py
import mysql.connector
from mysql.connector import pooling

# Database connection pool configuration
dbconfig = {
    "host": "localhost",
    "user": "root",
    "password": "sql@MACOS",
    "database": "TestLott_12Nov"
}

# Global connection pool
pool = pooling.MySQLConnectionPool(
    pool_name="mypool",
    pool_size=5,  # Number of connections in the pool
    **dbconfig
)

def connect_fun():
    try:
        # Get a connection from the pool
        mydb = pool.get_connection()
        if mydb.is_connected():
            return mydb
        else:
            print("Connection is not valid")
            return None
    except mysql.connector.Error as e:
        print(f"Error connecting to MySQL: {e}")
        return None

Upvotes: 0

Views: 17

Answers (0)

Related Questions