Reputation: 1115
TLDR: In essence, we produce about 800 kB/s while a consumer is reading 130 MB/s, which is more than 150 x what we produce ...
A litte more detail. We have a few Golang producers that write about 800 kB/s to a few topics in a Redpanda (Kafka) cluster. Then we have a few KafkaJS consumers, some of them are reading from a single topic while others are reading from multiple. All consumers process the messages they are expected to read, but one of consumers is reading more data than expcected. It reads about 130 MB/s.
The image shows how much we read and write to the cluster.
I tried removing the consumer for a while, but adding it again just makes it climb back up to 100+ MB/s. When removing and adding the consumer quickly it jumps straight back to 100+ MB/s. When adding multiple consumers it just adds up. 200+ MB/s, 300+ MB/s etc.
I have asked around but haven't had any luck finding anyone with the same problem, also can't find anything wrong with the client code.
this.redpandaClient = new Kafka({
brokers: [redpandaBrokerUrl],
clientId: "time-series-storage-writer",
logLevel: logLevel.ERROR,
});
this.redpandaConsumer = this.redpandaClient.consumer({
groupId: `time-series-storage-writer`,
maxBytes: 1048576,
maxWaitTimeInMs: 1000,
minBytes: 131072,
});
await redpandaClient.redpandaConsumer.run({
eachBatch: measurementProcessingUseCases.eachBatchHandler,
eachBatchAutoResolve: true,
});
eachBatchHandler = async ({
batch,
heartbeat,
isRunning,
isStale,
}: EachBatchPayload): Promise<void> => {
const measurements: RepositoryMeasurement[] = [];
for (const message of batch.messages) {
if (!isRunning() || isStale()) {
break;
}
const measurement = this.preprocessMessage(message);
measurements.push(measurement);
}
if (measurements.length > 0) {
const chunks = this.divideMeasurementsIntoChunks(measurements);
for (const chunk of chunks) {
try {
await this.config.timeSeriesRepository.storeMeasurements(chunk);
} catch (error) {
throw new Error(`An error occurred while storing: ${error}`);
}
await heartbeat();
}
}
};
Upvotes: 0
Views: 660
Reputation: 79
Since you are handling batch, you will need to mark a message in the batch as processed. If not, it will always read from the last unresolved offset.
resolveOffset(message.offset)
await heartbeat()
Looks like it's trying to read from the beginning since there are no marked messages.
Example can be found in https://kafka.js.org/docs/consuming
Upvotes: 0