Usman Hussain
Usman Hussain

Reputation: 187

Singleton Pulsar Producer Using Python Client Not Working

I'm struggling with creating a singleton Apache Pulsar producer in a Django app. I have a Django application that needs to produce hundreds of messages every second. Instead of creating a new producer every time, I want to reuse the same producer that was created the first time.

The problem I'm facing is that the producer gets successfully created, I can see the logs. but if I try to send a message using the producer, it gets stuck without any error. Below is my code for creating the producer.

import logging
import os
import threading

import pulsar

from sastaticketpk.settings.config import PULSAR_ENABLED

logger = logging.getLogger(__name__)

PULSAR_ENV = os.environ.get("PULSAR_CONTAINER")

class PulsarClient:
    __producer = None
    __lock = threading.Lock()  # A lock to ensure thread-safe initialization of the singleton producer

    def __init__(self):
        """ Virtually private constructor. """
        raise RuntimeError("Call get_producer() instead")

    @classmethod
    def initialize_producer(cls):
        if PULSAR_ENABLED and PULSAR_ENV:
            logger.info("Creating pulsar producer")
            client = pulsar.Client(
                "pulsar://k8s-tooling-pulsarpr-7.elb.ap-southeast-1.amazonaws.com:6650"
            )
            # Create the producer
            try:
                cls.__producer = client.create_producer(
                    "persistent://public/default/gaf",
                    send_timeout_millis=1000
                )
                logger.info("Producer created successfully")
            except pulsar._pulsar.TopicNotFound as e:
                logger.error(f"Error creating producer: {e}")
            except Exception as e:
                logger.error(f"Error creating producer: {e}")

    @classmethod
    def get_producer(cls):
        logger.info(f"Producer value {cls.__producer}")
        if cls.__producer is None:
            with cls.__lock:
                if cls.__producer is None:
                    cls.initialize_producer()
        logger.info(f"Is producer connected {cls.__producer}")
        return cls.__producer

I'm importing it in one of the utility files that runs at the startup of the application. Below is the code for calling:

PULSAR_PRODUCER = PulsarClient.get_producer()

The above code works fine on my local setup, but when the same code is deployed on production (i.e., AWS ECS), it doesn't work. If I SSH into the Docker container and then run a Python shell and try to import PULSAR_PRODUCER, it shows as None. Please suggest what I might be missing, as I'm really stuck in understanding.

Upvotes: 0

Views: 120

Answers (0)

Related Questions