Reputation: 9624
So, we are currently in the process of upgrading netty 3.x to netty 4.1 in our MQTT-based messaging backend. In our application, we use a custom MQTT message decoder and encoder.
For our decoder, I am currently using a ByteToMessageDecoder as follows:
public class MqttMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 2) {
return;
}
.....
.....
.....
byte[] data = new byte[msglength];
in.resetReaderIndex();
in.readBytes(data);
MessageInputStream mis = new MessageInputStream(
new ByteArrayInputStream(data));
Message msg = mis.readMessage();
out.add(msg);
ReferenceCountUtil.release(in);
}
}
where Message
is our custom object, that is passed to the next ChannelHandler
's channelRead()
. As you can see, I am done with the incoming ByteBuf
object in
as soon as I create a Message
object from it. So, since ByteBuf
is reference-counted in netty, is it correct that I need to release the in
object here by calling ReferenceCountUtil.release(in)
? Ideally, this seems right according to the doc. However, when I do this, I seem to be facing the exception:
Wed May 24 io.netty.channel.DefaultChannelPipeline:? WARN netty-workers-7 An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.channel.ChannelPipelineException: com.bsb.hike.mqtt.MqttMessageDecoder.handlerRemoved() has thrown an exception.
at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:631) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline.destroyDown(DefaultChannelPipeline.java:867) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline.access$300(DefaultChannelPipeline.java:45) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline$9.run(DefaultChannelPipeline.java:874) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:374) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_72-internal]
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:111) ~[netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.handler.codec.ByteToMessageDecoder.handlerRemoved(ByteToMessageDecoder.java:217) ~[netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:626) [netty-all-4.1.0.Final.jar:4.1.0.Final]
... 7 common frames omitted
This tells me that when the child channel is closed, all the handlers in the pipeline are removed one after another. When this decoder handler is closed, we are explicitly releasing the ByteBuf
attached to this decoder which results in the IllegalReferenceCountException
exception, when the method below is called.
This is the AbstractReferenceCountedByteBuf#release
:
@Override
public boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
What is the correct way to release the ByteBuf
objects then, to not encounter this issue?
I am using the PooledByteBufAllocator
-
new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
Please let me know if you need any more info regarding the configuration.
EDIT:
As an add-on to Ferrybig's answer, the ByteToMessageDecoder#channelRead
handles the releasing of the incoming ByteBuf
s by itself. See the finally
block -
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
If the inbound ByteBuf
is being transferred to the next channel handler down the pipeline, the reference count of this ByteBuf
is increased through ByteBuf#retain
and so if the next handler after your decoder is your business handler (which is typically the case), you need to release that ByteBuf
object there to avoid any memory leaks. This is also mentioned in the docs here.
Upvotes: 2
Views: 4425
Reputation: 732
@Ferrybig his answer is good enough.
Here I want to add some simple convention dealing with ByteBuf releasing.
When you have a InboundHandler like this:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = ByteBuf(msg)
...
// no fireChannelRead()
}
since you intercept the InboundHandler after it(if it had), and no one is taking care of the ByteBuf
object, you need to release it manually.
you can also call fireChannelRead(). Netty add a tail handler to release the ByteBuf
for you by default. So you do not need to release it after calling fireChannelRead().
If my answer has any problem, please tell me.
Upvotes: 2
Reputation: 18824
Not all handlers require the passed in bytebuf to be destroyed. ByteToMessageDecoder
is one of them.
The reason for this is that this handler collects multiple incoming bytebufs, and exposes them to your application as 1 continuous stream of bytes, for the ease of coding, and not needing to handle these chunks yourself
Remember that you still need to manually release any bytebufs your create by either using readBytes
or readSlice
, as stated by the javadoc.
Upvotes: 4