Reputation: 1093
I'm a new Netty user and I'm having some difficulty after reading the user guide and looking at the factorial example. For my project I need to set up a one-to-many server-client connection, which will be used for sending data from electronic passports for remote verification. To accomplish this I have chosen to use version 4.0.23 of Netty as it is the currently highest stable version.
I'm having some difficulty with my adaptation of the above factorial example. When I send a message and encode and decode it, somehow the decode method is called three times, which leads to exceptions after reading from the buffer.
My minimal working example is below. Please note that SSL is not enabled, since I am not providing additional arguments upon launch.
The Server initializer
public class VerifyServerInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
public VerifyServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// Convert Strings to ByteBufs and vice versa
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new MessageEncoder());
// Add the business logic.
pipeline.addLast(new VerifyServerHandler());
}
}
The Server itself
public class VerifyServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8322"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
// TODO: Change to a trusted certificate
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new VerifyServerInitializer(sslCtx));
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
The Server handler
public class VerifyServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("Incoming data: " + msg);
}
}
The Encoder
public class MessageEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
System.out.printf("Encoding '%s' into a byte array.\n", msg);
byte[] encoded = msg.getBytes(Charset.forName("UTF-8"));
System.out.println("Encoding result: " + new String(encoded));
System.out.printf("Length of the byte array: %d.\n", encoded.length);
//Write a message.
out.writeByte((byte) 'F'); // magic number
out.writeInt(encoded.length); // data length
out.writeBytes(encoded); // data
}
}
The Decoder
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// Wait until the length prefix is available.
if (in.readableBytes() < 5) {
System.out.println("Insufficient bytes to start decoding. Waiting for more data...");
return;
}
in.markReaderIndex();
System.out.printf("%d bytes available for reading.\n", in.readableBytes());
// Check the magic number.
System.out.println("Checking the magic number.");
int magicNumber = in.readUnsignedByte();
if (magicNumber != 'F') {
in.resetReaderIndex();
throw new CorruptedFrameException("Invalid magic number: " + magicNumber);
}
}
}
The Client initializer
public class VerifyClientInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
public VerifyClientInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc(), VerifyClient.HOST, VerifyClient.PORT));
}
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new MessageEncoder());
// and then business logic.
pipeline.addLast(new VerifyClientHandler());
}
}
The Client itself
public class VerifyClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8322"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
// TODO: Change to secure TrustManager
sslCtx = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE);
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new VerifyClientInitializer(sslCtx));
// Make a new connection.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Get the handler instance to retrieve the answer.
VerifyClientHandler handler = (VerifyClientHandler) f.channel().pipeline().last();
// Print out the answer.
// System.err.format("Factorial of %,d is: %,d", COUNT, handler.getFactorial());
// handler.authenticate();
} finally {
group.shutdownGracefully();
}
}
}
The Client handler
public class VerifyClientHandler extends SimpleChannelInboundHandler<String> {
ChannelHandlerContext ctx;
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Channel has become active.");
this.ctx = ctx;
String command = "obey";
sendMessage(command);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("Incoming data: " + msg);
}
protected void sendMessage(String command) {
ChannelFuture future;
System.out.printf("Writing command '%s' to the channel.\n", command);
future = ctx.writeAndFlush(command);
System.out.println("Waiting for the operation to complete.");
assert future != null;
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("Operation complete.");
if (future.isSuccess()) {
System.out.println("Message sent successfully.");
} else {
future.cause().printStackTrace();
future.channel().close();
}
}
});
}
}
The output from the server is as follows:
Oct 23, 2014 4:56:59 PM io.netty.handler.logging.LoggingHandler channelRegistered
INFO: [id: 0x12d864c7] REGISTERED
Oct 23, 2014 4:56:59 PM io.netty.handler.logging.LoggingHandler bind
INFO: [id: 0x12d864c7] BIND(0.0.0.0/0.0.0.0:8322)
Oct 23, 2014 4:56:59 PM io.netty.handler.logging.LoggingHandler channelActive
INFO: [id: 0x12d864c7, /0.0.0.0:8322] ACTIVE
Oct 23, 2014 4:57:02 PM io.netty.handler.logging.LoggingHandler logMessage
INFO: [id: 0x12d864c7, /0.0.0.0:8322] RECEIVED: [id: 0x5a38547d, /127.0.0.1:62319 => /127.0.0.1:8322]
9 bytes available for reading.
Checking the magic number.
8 bytes available for reading.
Checking the magic number.
8 bytes available for reading.
Checking the magic number.
Oct 23, 2014 4:57:02 PM io.netty.channel.DefaultChannelPipeline$TailContext exceptionCaught
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.CorruptedFrameException: Invalid magic number: 0
at org.irmacard.identity.common.MessageDecoder.decode(MessageDecoder.java:29)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:662)
Oct 23, 2014 4:57:02 PM io.netty.channel.DefaultChannelPipeline$TailContext exceptionCaught
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.CorruptedFrameException: Invalid magic number: 0
at org.irmacard.identity.common.MessageDecoder.decode(MessageDecoder.java:29)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:205)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:233)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:219)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:769)
at io.netty.channel.AbstractChannel$AbstractUnsafe$5.run(AbstractChannel.java:567)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:662)
And from the client:
Channel has become active. Writing command 'obey' to the channel. Encoding 'obey' into a byte array. Encoding result: obey Length of the byte array: 4. Waiting for the operation to complete. Operation complete. Message sent successfully. Insufficient bytes to start decoding. Waiting for more data... Process finished with exit code 0
I notice that somehow there are three subsequent calls to the decode method from the server side. I've read elsewhere that this may be caused by the channel becoming inactive. I just cannot seem to figure out where this occurs. Could anyone please point me in the right direction?
Thanks!
Upvotes: 3
Views: 4127
Reputation: 106
The decoding mechanism is explained more in the Netty User Guide - Dealing with a Stream-based Transport (if you didn't see it). Multiple calls are made to ensure the full message bytes are received.
Upvotes: 1
Reputation: 1093
I have found the solution and will post it here for future reference should anyone else run into this issue.
In the JavaDoc of the decode
method there is the following:
Decode the from one
ByteBuf
to an other. This method will be called till either the inputByteBuf
has nothing to read when return from this method or till nothing was read from the inputByteBuf
.
All in all it would appear that because I did not add anything to the out
List
, the method will keep decoding until no more data is found in the ByteBuf
. Furthermore, because I updated the reader index if a correct magic number is found, the rest of the data is read from beyond that magic number. Since there is only one of these, the next iteration will not find one and throw a CorruptedFrameException
which will cause execution to halt, because I overrode the exceptionCaught
method to close the context.
Upvotes: 2