u6f6o
u6f6o

Reputation: 2190

How to efficiently distribute inbound netty.io ByteBuf messages to a ChannelGroup?

I created a netty.io BootStrap that receives streaming data from a legacy server. The server sends the data using the ISO-8859-1 charset. There is also an internal "protocol" in place that uses different separator bytes:

private static final byte GS = 29;  // FS ASCII char 1D (group separator)
private static final byte RS = 30;  // FS ASCII char 1E (record separator)
private static final byte US = 31;  // FS ASCII char 1F (unit separator)

private static final ByteProcessor GROUP_SEPARATOR_LOCATOR = value -> value != GS;
private static final ByteProcessor RECORD_SEPARATOR_LOCATOR = value -> value != RS;
private static final ByteProcessor UNIT_SEPARATOR_LOCATOR = value -> value != US;

These ByteProcessor instances are used to split the messages. Every message is finally translated into its corresponding object representation, whereas the keyValueMapping contains the main content from the origin message:

public class Update {
    private final String id;    
    private final UpdateType updateType;
    private final Map<String, byte[]> keyValueMapping;
    // SOME OTHER STUFF
}

Subsequently all updates are forwarded to all connected web-socket clients which are handled by a separate ServerBootStrap:

public void distribute(ChannelGroup recipients, Object msg) {
    Update updateMsg = (Update) msg;
    recipients.writeAndFlush(updateMsg);
}

When I activated Java Flight Recording and performed some load tests, I realised that the main allocation hotspots is the method that translates the values from the initial inbound message into am ISO-8859-1 byte array:

private byte[] translateValue(ByteBuf in) {
    byte [] result;

    if (!in.hasArray()) {
        result = new byte[in.readableBytes()];
        in.getBytes(in.readerIndex(), result);
    } else {
        result = in.array();
    }
    return result;
}

Initially I did not translate the ByteBufs and stored them directly in the Update's keyValueMapping map. Since the ByteBuf object maintains some internal indices (reader, writer, marker etc.) which are not guarded - by design, I was afraid to simply wrap and forward these ByteBufs to different channels (see recipients channelGroup above) and decided to go with this byte[] representation instead.

Inspecting the Java Flight Recording results, I wonder if there is any recommendation how to distribute unchanged inbound data to a group of different channels without bordering the GC too much? Learning from the results, direct buffers are used for the given channel, as a lot of new byte arrays are created.

To give more context, I also add the code that performs the remaining message translation:

while (in.readableBytes() > 0) {
    ByteBuf keyAsByteBuf = nextToken(in, UNIT_SEPARATOR_LOCATOR);
    String key = translateKey(keyAsByteBuf);

    if (key != null) {
        ByteBuf valueAsByteBuf = nextToken(in, RECORD_SEPARATOR_LOCATOR);
        byte[] value = translateValue(valueAsByteBuf);

        if (value.length > 0) {
            mapping.put(key, value); 
        }
    }
}

private ByteBuf nextToken(ByteBuf in, ByteProcessor locator) {
    int separatorIdx = in.forEachByte(in.readerIndex(), in.readableBytes(), locator);

    if (separatorIdx >= 0) {
        ByteBuf token = in.readSlice(separatorIdx - in.readerIndex());
        in.skipBytes(1);
        return token;
    }
    return in.readSlice(in.readableBytes());
}

private String translateKey(ByteBuf in) {
    return keyTranslator.translate(in);
}

Upvotes: 1

Views: 713

Answers (1)

Dmitriy Dumanskiy
Dmitriy Dumanskiy

Reputation: 12817

Hm... Actually, your question is not that simple. I'll try to answer briefly.

You don't need to translate ByteBuf to byte[] if you don't need that in your app. So I assume you have next structure:

public class Update {
    private final String id;    
    private final UpdateType updateType;
    private final Map<String, ByteBuf> keyValueMapping;
}

The problem here is that you partially parse ByteBuf. So you have java object + ByteBuf's within this java object.

That's totally fine and you can operate with those ByteBuf's further. Your class Update should implement ReferenceCounted interface. So when you do recipients.writeAndFlush(updateMsg) (assuming recipients are DefaultChannelGroup) netty DefaultChannelGroup will handle references to those buffers.

So what happens:

After recipients.writeAndFlush(updateMsg), DefaultChannelGroup in loop sends your updateMsg to every channel in the list with channel.writeAndFlush(safeDuplicate(message)). safeDuplicate is special method that handles references to netty ByteBuf, so you can send the same buffer to multiple receivers (it actually copies buffers with retainedDuplicate()). However, your object is not ByteBuf, but java object. Here is code of that method:

private static Object safeDuplicate(Object message) {
    if (message instanceof ByteBuf) {
        return ((ByteBuf) message).retainedDuplicate();
    } else if (message instanceof ByteBufHolder) {
        return ((ByteBufHolder) message).retainedDuplicate();
    } else {
        return ReferenceCountUtil.retain(message);
    }
}

So, in order to correctly handle references of ByteBuf you need to implement ReferenceCounted for ReferenceCountUtil.retain(message). Something like that:

public class Update implements ReferenceCounted {
    @Override
    public final Update retain() {
        return new Update(id, updateType, makeRetainedBuffers());
    }  

    private Map makeRetainedBuffers() {
       Map newMap = new HashMap();
       for (Entry entry : keyValueMapping) {
           newMap.put(entry.key, entry.value.duplicate().retain())
       }
       return newMap;
    }
}

This is just a pseudo code. But you should get the idea. You also have to implement release() method within Update class and make sure it always releases buffers it holds. And release all buffers inside. I assume you already have the encoder in your pipeline for that Update class that will call release().

Another option would be to implement own DefaultChannelGroup. In that case you don't have to rely on safeDuplicate method. And thus you don't need to implement ReferenceCounted, however you'll still need to handle retain, release manually inside that class.

Upvotes: 1

Related Questions