Reputation: 375
I have implemented a NettyDecoder in one of my application
The protocol of the application is simple first four characters would be the message length then the message.
The frame decoder logic is
import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.nio.cs.StandardCharsets;
public class ITMDecoder extends FrameDecoder {
public static String bytesToStringUTFCustom(byte[] bytes) {
char[] buffer = new char[bytes.length >> 1];
for(int i = 0; i < buffer.length; i++) {
int bpos = i << 1;
char c = (char)(((bytes[bpos]&0x00FF)<<8) + (bytes[bpos+1]&0x00FF));
buffer[i] = c;
}
return new String(buffer);
}
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buf) throws Exception {
Logger logger = LoggerFactory.getLogger(ITMDecoder.class);
// Make sure if the length field was received.
if (buf.readableBytes() < 4) {
// The length field was not received yet - return null.
// This method will be invoked again when more packets are
// received and appended to the buffer.
return null;
}
// The length field is in the buffer.
// Mark the current buffer position before reading the length field
// because the whole frame might not be in the buffer yet.
// We will reset the buffer position to the marked position if
// there's not enough bytes in the buffer.
buf.markReaderIndex();
// Read the length field.
byte[] twoBytesLength = new byte[4];
for(int i = 0 ; i < 4 ; i++)
twoBytesLength[i] = buf.getByte(i);
String str = new String(twoBytesLength, "UTF-8");
Short shortValue = Short.parseShort(str);
int length = shortValue.intValue() + 4;
// Make sure if there's enough bytes in the buffer.
if (buf.readableBytes() < length) {
// The whole bytes were not received yet - return null.
// This method will be invoked again when more packets are
// received and appended to the buffer.
// Reset to the marked position to read the length field again
// next time.
buf.resetReaderIndex();
return null;
}
// There's enough bytes in the buffer. Read it.
ChannelBuffer frame = buf.readBytes(length);
// Successfully decoded a frame. Return the decoded frame.
return frame;
}
}
Channel Pipeline Logic is:
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new ITMDecoder(),
new M3AlertHandler()
);
};
});
It works fine when the transaction volume is around 2 tps . However the frame decoder corrupts when transactions are send with higher tps.
I have checked the same with Socket workbench with one long message of 2 variable lengths
The message i used to sent to the server is Message 1 = 00051234500041234
Repeated the same 1000 times and the sent it in a second the decoder corrupts after 5/6 messages ?
Is there anything that I am missing which will actually make it work properly?
Upvotes: 1
Views: 1131
Reputation: 12351
for(int i = 0 ; i < 4 ; i++)
twoBytesLength[i] = buf.getByte(i);
You should never assume that the index of the first byte is 0
. The index of the first byte is buf.readerIndex()
.
for (int i = 0; i < 4; i ++) {
twoBytesLength[i] = buf.getByte(buf.readerIndex() + i);
}
You could optimize byte buffer access even further but that's not the scope of the question.
Upvotes: 3