gravetii
gravetii

Reputation: 9624

Correctly releasing reference-counted ByteBuf objects in netty 4.1

So, we are currently in the process of upgrading netty 3.x to netty 4.1 in our MQTT-based messaging backend. In our application, we use a custom MQTT message decoder and encoder.

For our decoder, I am currently using a ByteToMessageDecoder as follows:

public class MqttMessageDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 2) {
            return;
        }

        .....
        .....
        .....

        byte[] data = new byte[msglength];
        in.resetReaderIndex();
        in.readBytes(data);
        MessageInputStream mis = new MessageInputStream(
                new ByteArrayInputStream(data));
        Message msg = mis.readMessage();
        out.add(msg);
        ReferenceCountUtil.release(in);
    }
}

where Message is our custom object, that is passed to the next ChannelHandler's channelRead(). As you can see, I am done with the incoming ByteBuf object in as soon as I create a Message object from it. So, since ByteBuf is reference-counted in netty, is it correct that I need to release the in object here by calling ReferenceCountUtil.release(in)? Ideally, this seems right according to the doc. However, when I do this, I seem to be facing the exception:

Wed May 24 io.netty.channel.DefaultChannelPipeline:? WARN netty-workers-7 An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.channel.ChannelPipelineException: com.bsb.hike.mqtt.MqttMessageDecoder.handlerRemoved() has thrown an exception.
    at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:631) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.channel.DefaultChannelPipeline.destroyDown(DefaultChannelPipeline.java:867) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.channel.DefaultChannelPipeline.access$300(DefaultChannelPipeline.java:45) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.channel.DefaultChannelPipeline$9.run(DefaultChannelPipeline.java:874) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:374) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_72-internal]
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:111) ~[netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.handlerRemoved(ByteToMessageDecoder.java:217) ~[netty-all-4.1.0.Final.jar:4.1.0.Final]
    at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:626) [netty-all-4.1.0.Final.jar:4.1.0.Final]
    ... 7 common frames omitted

This tells me that when the child channel is closed, all the handlers in the pipeline are removed one after another. When this decoder handler is closed, we are explicitly releasing the ByteBuf attached to this decoder which results in the IllegalReferenceCountException exception, when the method below is called.

This is the AbstractReferenceCountedByteBuf#release:

@Override
    public boolean release() {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, -1);
            }

            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
                if (refCnt == 1) {
                    deallocate();
                    return true;
                }
                return false;
            }
        }
    }

What is the correct way to release the ByteBuf objects then, to not encounter this issue?

I am using the PooledByteBufAllocator -

new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

Please let me know if you need any more info regarding the configuration.


EDIT:

As an add-on to Ferrybig's answer, the ByteToMessageDecoder#channelRead handles the releasing of the incoming ByteBufs by itself. See the finally block -

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

If the inbound ByteBuf is being transferred to the next channel handler down the pipeline, the reference count of this ByteBuf is increased through ByteBuf#retain and so if the next handler after your decoder is your business handler (which is typically the case), you need to release that ByteBuf object there to avoid any memory leaks. This is also mentioned in the docs here.

Upvotes: 2

Views: 4425

Answers (2)

xinnjie
xinnjie

Reputation: 732

@Ferrybig his answer is good enough.

Here I want to add some simple convention dealing with ByteBuf releasing.

When you have a InboundHandler like this:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = ByteBuf(msg)
...
// no fireChannelRead()
}

since you intercept the InboundHandler after it(if it had), and no one is taking care of the ByteBuf object, you need to release it manually.

you can also call fireChannelRead(). Netty add a tail handler to release the ByteBuf for you by default. So you do not need to release it after calling fireChannelRead().

If my answer has any problem, please tell me.

Upvotes: 2

Ferrybig
Ferrybig

Reputation: 18824

Not all handlers require the passed in bytebuf to be destroyed. ByteToMessageDecoder is one of them.

The reason for this is that this handler collects multiple incoming bytebufs, and exposes them to your application as 1 continuous stream of bytes, for the ease of coding, and not needing to handle these chunks yourself

Remember that you still need to manually release any bytebufs your create by either using readBytes or readSlice, as stated by the javadoc.

Upvotes: 4

Related Questions