Reputation: 1
Registering event handlers
hii
Registering event handlers
hii
Connecting Kafka admin...
Server is running on port 9001
{"level":"ERROR","timestamp":"2024-08-09T21:22:29.409Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED ::1:9002","broker":"localhost:9002","clientId":"user-service","stack":"Error: connect ECONNREFUSED ::1:9002\n at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1495:16)"}
import { Kafka, Producer, ProducerConfig, KafkaConfig, Admin, ITopicConfig } from "kafkajs";
import { EventData } from "../types/scriptInterfaces";
class KafkaManager {
private kafkaConfig: KafkaConfig = {
brokers: [process.env.KAFKA_BROKER_URL || 'localhost:9092'],
clientId: "user-service",
};
private kafka = new Kafka(this.kafkaConfig);
private producerConfig: ProducerConfig = {
allowAutoTopicCreation: false,
transactionTimeout: 5000,
};
private producer: Producer = this.kafka.producer(this.producerConfig);
constructor() {
this.initializeKafkaProducer();
}
public async initializeKafkaAdmin() {
const admin: Admin = this.kafka.admin({ retry: { retries: 10 } });
try {
console.log("Connecting Kafka admin...", admin);
// Attempt to connect to Kafka admin with proper error handling.
await admin.connect().catch((error) => {
console.error('Error connecting Kafka admin:', error);
throw error; // Re-throw the error to handle it in the outer catch block.
});
console.log("Kafka admin connected and trying to create topics");
const topics: ITopicConfig[] = [
{
topic: "user-services",
numPartitions: 2,
replicationFactor: 1,
},
];
const topicCreationResult = await admin.createTopics({
waitForLeaders: true,
topics: topics,
});
if (topicCreationResult) {
console.log("Kafka topics created successfully.");
} else {
console.log("Kafka topics were already created.");
}
} catch (error) {
console.error('Failed to create Kafka topics:', error);
} finally {
console.log("Disconnecting Kafka admin...");
await admin.disconnect();
console.log('Kafka admin disconnected.');
}
}
private async initializeKafkaProducer() {
try {
console.log("Initializing Kafka producer...");
await this.producer.connect();
console.log('Kafka producer connected.');
} catch (error) {
console.error('Failed to connect Kafka producer:', error);
}
}
public async produceEvent(topic: string, key: string, message: EventData) {
try {
const messages = [
{
headers: { source: topic },
key: key,
value: JSON.stringify(message),
},
];
await this.producer.send({
topic: topic,
messages: messages,
});
console.log(`Message sent to topic ${topic}: ${JSON.stringify(message)}`);
} catch (error) {
console.error('Failed to send message:', error);
}
}
public async disconnectKafkaProducer() {
try {
await this.producer.disconnect();
console.log('Kafka producer disconnected.');
} catch (error) {
console.error('Failed to disconnect Kafka producer:', error);
}
}
private consumer = this.kafka.consumer({
groupId: 'user-service',
});
public async initializeKafkaConsumer() {
try {
await this.consumer.connect();
console.log('Kafka consumer connected.');
await this.consumer.subscribe({ topic: 'user-services', fromBeginning: true });
console.log('Subscribed to topic user-services');
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic,
partition,
key: message.key?.toString(),
value: message.value?.toString(),
});
},
});
} catch (error) {
console.error('Failed to initialize Kafka consumer:', error);
}
}
public async disconnectKafkaConsumer() {
try {
await this.consumer.disconnect();
console.log('Kafka consumer disconnected.');
} catch (error) {
console.error('Failed to disconnect Kafka consumer:', error);
}
}
}
export default KafkaManager;
import KafkaManager from "../src/kafka/kafkaManager";
class ServerManager {
private KAFKA = new KafkaManager();
public start() {
new EventHandler();
this.KAFKA.initializeKafkaAdmin();
this.server = this.app.listen(process.env.PORT || 9001, () => {
console.log('Server is running on port 9001');
});
}
private async shutdownGracefully() {
try {
console.log('Performing cleanup before shutdown...');
// Flush logs
this.flushLogs();
// Close database connection
await this.closeDatabaseConnection();
// Close Redis client
// await this.closeRedisClient();
await this.KAFKA.disconnectKafkaProducer();
// Stop the server
await this.stopServer();
console.log('Cleanup completed. Exiting application.');
process.exit(0); // Exit with success code
} catch (error) {
console.error('Error during cleanup:', error);
process.exit(1); // Exit with failure code
}
}
}
// Initialize and start the server
const serverManager = new ServerManager();
new EventHandler();
// Start the server to listen for incoming requests. This will start the server and log the start message.
serverManager.start();
I was trying to connect the Kafka admin and create a topic, but the admin did not connect when I used await keywords. The same issue occurs with createTopics. Even if I do not use await, I still get the error mentioned above.
I am using Docker containers for Zookeeper and Kafka, which I start like this:
# Start Zookeeper
docker run -p 2181:2181 zookeeper
# Start Kafka with the following configurations:
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka
Upvotes: 0
Views: 36