gomzee
gomzee

Reputation: 103

Memory spikes while consuming from topic and publishing to topic based on topic name from consumed message

I have a route which is consuming JSON String from kafka topic and sends data to another route: direct:streamerRoute

from("direct:streamerRoute").routeId("streamerRoute")
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {

                        DocumentContext jsonContext = JsonPath.using(configuration).parse(exchange.getIn().getBody(String.class));
                        String tableName = jsonContext.read(eventTableNameInputExpr);
                        String eventId = jsonContext.read(eventIdInputExpr);
                        log.info("eventId: {}, tableName={}", eventId, tableName);

                        exchange.setProperty("staticDataYes", isStaticData(tableName));
                        exchange.setProperty("transactionDataYes", isTransactionData(tableName));
                    }
                })
                .choice()
                    .when(exchangeProperty("staticDataYes").isEqualTo(true))
                        .to("direct:StaticData")
                    .when(exchangeProperty("transactionDataYes").isEqualTo(true))
                        .to("direct:TransactionData")
                    .otherwise()
                        .log("The event is not either static data of transaction data")
                .end();

These direct:StaticData and direct:TransactionData are further just publishing to there respective topic.

I can see gradually my POD memory keeps on increasing and it reaches upto 4.5 GB for 150K messages.

I am not getting what can be going wrong.

I tried to run JProfiler locally with that only observation I have is that there multiple byte[] object getting accumulated and memory keep on increasing based on this accumulation.

Basis some feedback got from ChatGPT I have added following also to my main consumer route:

.noMessageHistory()
.streamCaching("false")

This is very simple processing I do not expect memory to increase this way. As even after processing all message memory utilisation doesn't comes down.

Looks like some memory leak is happening.

If anyone faced similar situation or anything please help to revert.

Upvotes: 0

Views: 26

Answers (1)

gomzee
gomzee

Reputation: 103

I managed to find the problem. I was using camel observability library and using Jprofiler realised that it is holding objects in memory some how. After removing the library my memory and cpu utilisation were good and processing was quite fast this time

Upvotes: 0

Related Questions