Mayank Soni
Mayank Soni

Reputation: 1

Kafka gives error ECONNREFUSED ::1:9002\n at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1495:16)"}

Registering event handlers
hii
Registering event handlers
hii
Connecting Kafka admin...

Server is running on port 9001
{"level":"ERROR","timestamp":"2024-08-09T21:22:29.409Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED ::1:9002","broker":"localhost:9002","clientId":"user-service","stack":"Error: connect ECONNREFUSED ::1:9002\n    at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1495:16)"}
import { Kafka, Producer, ProducerConfig, KafkaConfig, Admin, ITopicConfig } from "kafkajs";
import { EventData } from "../types/scriptInterfaces";

class KafkaManager {
    private kafkaConfig: KafkaConfig = {
        brokers: [process.env.KAFKA_BROKER_URL || 'localhost:9092'],
        clientId: "user-service",  
    };

    private kafka = new Kafka(this.kafkaConfig);

    private producerConfig: ProducerConfig = {
        allowAutoTopicCreation: false,
        transactionTimeout: 5000,
    };

    private producer: Producer = this.kafka.producer(this.producerConfig);

    constructor() {
        this.initializeKafkaProducer();
    }

    public async initializeKafkaAdmin() {
        const admin: Admin = this.kafka.admin({ retry: { retries: 10 } });
        try {
            console.log("Connecting Kafka admin...", admin);
            // Attempt to connect to Kafka admin with proper error handling.
            await admin.connect().catch((error) => {
                console.error('Error connecting Kafka admin:', error);
                throw error; // Re-throw the error to handle it in the outer catch block.
            });

            console.log("Kafka admin connected and trying to create topics");
            const topics: ITopicConfig[] = [
                {
                    topic: "user-services",
                    numPartitions: 2,
                    replicationFactor: 1,
                },
            ];

            const topicCreationResult = await admin.createTopics({
                waitForLeaders: true,
                topics: topics,
            });

            if (topicCreationResult) {
                console.log("Kafka topics created successfully.");
            } else {
                console.log("Kafka topics were already created.");
            }

        } catch (error) {
            console.error('Failed to create Kafka topics:', error);
        } finally {
            console.log("Disconnecting Kafka admin...");
            await admin.disconnect();
            console.log('Kafka admin disconnected.');
        }
    }

    private async initializeKafkaProducer() {
        try {
            console.log("Initializing Kafka producer...");
            await this.producer.connect();
            console.log('Kafka producer connected.');
        } catch (error) {
            console.error('Failed to connect Kafka producer:', error);
        }
    }

    public async produceEvent(topic: string, key: string, message: EventData) {
        try {
            const messages = [
                {
                    headers: { source: topic },
                    key: key,
                    value: JSON.stringify(message),
                },
            ];
            await this.producer.send({
                topic: topic,
                messages: messages,
            });
            console.log(`Message sent to topic ${topic}: ${JSON.stringify(message)}`);
        } catch (error) {
            console.error('Failed to send message:', error);
        }
    }

    public async disconnectKafkaProducer() {
        try {
            await this.producer.disconnect();
            console.log('Kafka producer disconnected.');
        } catch (error) {
            console.error('Failed to disconnect Kafka producer:', error);
        }
    }

    private consumer = this.kafka.consumer({
        groupId: 'user-service',
    });

    public async initializeKafkaConsumer() {
        try {
            await this.consumer.connect();
            console.log('Kafka consumer connected.');

            await this.consumer.subscribe({ topic: 'user-services', fromBeginning: true });
            console.log('Subscribed to topic user-services');

            await this.consumer.run({
                eachMessage: async ({ topic, partition, message }) => {
                    console.log({
                        topic,
                        partition,
                        key: message.key?.toString(),
                        value: message.value?.toString(),
                    });
                },
            });
        } catch (error) {
            console.error('Failed to initialize Kafka consumer:', error);
        }
    }

    public async disconnectKafkaConsumer() {
        try {
            await this.consumer.disconnect();
            console.log('Kafka consumer disconnected.');
        } catch (error) {
            console.error('Failed to disconnect Kafka consumer:', error);
        }
    }
}

export default KafkaManager;

import KafkaManager from "../src/kafka/kafkaManager";

class ServerManager {
    private KAFKA = new KafkaManager();

    public start() {
        new EventHandler();
        this.KAFKA.initializeKafkaAdmin();
        this.server = this.app.listen(process.env.PORT || 9001, () => {
            console.log('Server is running on port 9001');
        });
    }

    private async shutdownGracefully() {
        try {
            console.log('Performing cleanup before shutdown...');

            // Flush logs
            this.flushLogs();

            // Close database connection
            await this.closeDatabaseConnection();

            // Close Redis client
            // await this.closeRedisClient();

            await this.KAFKA.disconnectKafkaProducer();

            // Stop the server
            await this.stopServer();

            console.log('Cleanup completed. Exiting application.');
            process.exit(0); // Exit with success code
        } catch (error) {
            console.error('Error during cleanup:', error);
            process.exit(1); // Exit with failure code
        }
    }
}

// Initialize and start the server
const serverManager = new ServerManager();
new EventHandler();

// Start the server to listen for incoming requests. This will start the server and log the start message.
serverManager.start();

I was trying to connect the Kafka admin and create a topic, but the admin did not connect when I used await keywords. The same issue occurs with createTopics. Even if I do not use await, I still get the error mentioned above.

I am using Docker containers for Zookeeper and Kafka, which I start like this:

# Start Zookeeper
docker run -p 2181:2181 zookeeper

# Start Kafka with the following configurations:
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka

Upvotes: 0

Views: 36

Answers (0)

Related Questions