Miguel Barros
Miguel Barros

Reputation: 21

GRPC Severe Memory Leak With Netty

I'm getting an exception sometimes on GRPC using netty.

I'm creating a bunch of servers like so:

        for(int i = 0; i < 4; i++) {
            var executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
            var eventGroup = new NioEventLoopGroup(1); //One thread for each channel
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
            _channels[i] = NettyChannelBuilder
                    .forAddress(host, port + i)
                    .executor(executor)
                    .channelType(NioSocketChannel.class)
                    .eventLoopGroup(eventGroup)
                    .usePlaintext()
                    .build();
            var stub = ServiceDPASGrpc.newStub(_channels[i]);
            _stubs[i] = stub;
            _executors[i] = executor;
        }
        for (int i = 0; i < 4; i++) {
            var impl = ...
            _servers[i] = NettyServerBuilder.forPort(port + i).addService(impl).build();
            _servers[i].start();
            _impls[i] = impl;

        }

at the end of the application i close everything like so:

        for (int i = 0; i < 4; i++) {
            _channels[i].shutdown();
            _executors[i].shutdown();
            _servers[i].shutdown();

        }

However sometimes I get an exception stating:

Apr 21, 2020 2:32:51 PM io.grpc.netty.shaded.io.netty.util.ResourceLeakDetector reportTracedLeak
SEVERE: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
        io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
        io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
        io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
        io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
        io.grpc.netty.shaded.io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
        io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)
        io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        java.base/java.lang.Thread.run(Thread.java:834)

what am I doing wrong

Upvotes: 1

Views: 2257

Answers (1)

creamsoup
creamsoup

Reputation: 878

few things to check.

  1. you don't need to create EventLoop of size 1 in each loop. each channel is using only 1 EventLoop from EventLoopGroup. so, you can just pass in EventLoopGroup of larger threads number (if it is smaller than # of channels some of thread is shared).

  2. channel shutdown is not a blocking operation. in background, it still need to some work to wrap up etc. you need to wait to gracefully shutdown.

_channels[i]
  .shutdown() // could use `shutdownNow`, but still need to wait
  .awaitTermination(5, TimeUnit.SECONDS); // or whatever time

Upvotes: 1

Related Questions