user3211694
user3211694

Reputation: 1

Netty Decoding Error

I'm trying to write a simple TCP server/client system with packets. When the client becomes active, I have it send 1 packet to the server and the server receives it fine, but then the client throws the exception (shown below) and I'm not sure why. The client shouldn't be receiving any data back.

Client

    public class Client {

    public static void main(String[] args) throws Exception {
        new Client("localhost", 8000);
    }

    private Channel channel;

    public Client(final String host, final int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            new Bootstrap()
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast("decoder", new PacketDecoder());
                    ch.pipeline().addLast("encoder", new PacketEncoder());
                    ch.pipeline().addLast(new Handler());
                }
            })
            .connect(host, port).sync().channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    private class Handler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            channel = ctx.channel();
            System.out.println("Connected");
            //channel.writeAndFlush(new SimplePacket(25));
            channel.writeAndFlush(new SimplePacket(50));
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
            System.out.println("Disconnected");
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {       
            Packet packet = (Packet) msg;
            System.out.println("Received packet: " + packet.getId() + " | " + packet.toString());
        }

        @Override
        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }

    }

}

Server

    public class Server {

    public static void main(String[] args) throws Exception {
        new Server(8000);
    }

    private final Set<Channel> channels = new HashSet<Channel>();

    public Server(final int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap()
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast("decoder", new PacketDecoder());
                    ch.pipeline().addLast("encoder", new PacketEncoder());
                    ch.pipeline().addLast(new Handler());
                }
            });         
            b.bind(port)
            .addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {           
                        System.out.println("Listening on " + port);
                        } else {
                        System.out.println("Could not bind to host");
                    }
                }
            })
            .sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class Handler extends ChannelInboundHandlerAdapter {

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            channels.add(ctx.channel());
            System.out.println("Client connected [" + channels.size() + "]: " + ctx.channel().remoteAddress()); 
        }

        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            channels.remove(ctx.channel());
            System.out.println("Client disconnected [" + channels.size() + "]: " + ctx.channel().remoteAddress());
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            Packet packet = (Packet) msg;
            System.out.println("Received packet: " + packet.getId() + " | " + packet.toString());
        }

        @Override
        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }

    }

}

Packet Decoder

public class PacketDecoder extends ByteToMessageDecoder {

    private final PacketManager packetManager;

    public PacketDecoder(PacketManager packetManager) {
        this.packetManager = packetManager;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {  


if (buf.readableBytes() < 4) {
            return;
        }
        while (buf.readableBytes() != 0) {
            int id = buf.readInt();
            if (buf.readableBytes() != 0) { 
                if (packetManager.isRegistered(id)) {
                    Packet packet = packetManager.getPacket(id);
                    packet.read(buf);
                    out.add(packet);
                    } else {
                    buf.skipBytes(buf.readableBytes());
                    throw new DataException("Cannot receive unregistered packet: " + id);
                }
            }
        }
    }

}

Packet Encoder

    public class PacketEncoder extends MessageToByteEncoder<Packet> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf buf) throws Exception {
        buf.writeInt(packet.getId());
        packet.write(buf);
    }

}

SimplePacket class

public class SimplePacket extends Packet {

    private int data;

    public SimplePacket(int data) {
        this.data = data;
    }

    public SimplePacket() {

    }

    @Override
    public void read(ByteBuf buf) {
        data = buf.readInt();
    }   

    @Override
    public void write(ByteBuf buf) {
        buf.writeInt(data);
    }

    @Override
    public int getId() {
        return 1000;
    }

    @Override
    public String toString() {
        return "{" + data + "}";
    }

}

Exception

io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(78) + length(4) exceeds writerIndex(80): UnpooledUnsafeDirectByteBuf(ridx: 78, widx: 80, cap: 80)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:257)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:139)
    at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
    at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:126)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
    at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(78) + length(4) exceeds writerIndex(80): UnpooledUnsafeDirectByteBuf(ridx: 78, widx: 80, cap: 80)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1161)
    at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:612)
    at dataserver.packet.Packet.readString(Packet.java:21)
    at dataserver.packet.packets.SequencePacket.read(SequencePacket.java:74)
    at dataserver.packet.codec.PacketDecoder.decode(PacketDecoder.java:27)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:226)
    ... 10 more
io.netty.handler.codec.DecoderException: dataserver.DataException: Cannot receive unregistered packet: 2
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:257)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:139)
    at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
    at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:126)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
    at java.lang.Thread.run(Thread.java:724)
Caused by: dataserver.DataException: Cannot receive unregistered packet: 2
    at dataserver.packet.codec.PacketDecoder.decode(PacketDecoder.java:31)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:226)
    ... 10 more

New packet I'm using (Can vary in data size)

public class SequencePacket extends Packet {

private static final Map<Character, Class<? extends Object>> types = new HashMap<Character, Class<? extends Object>>();

static {
    types.put('b', Byte.class);
    types.put('f', Float.class);
    types.put('d', Double.class);
    types.put('s', Short.class);
    types.put('i', Integer.class);
    types.put('l', Long.class);
    types.put('c', Character.class);
    types.put('S', String.class);
    types.put('B', Boolean.class);
}

private final List<Object> data = new ArrayList<Object>();

public SequencePacket() {

}

public SequencePacket(Object...objects) {
    for (Object object : objects) {
        write(object);
    }
}

@Override
public void read(ByteBuf buf) {
    String sequence = Packet.readString(buf).trim();
    System.out.println("Sequence: " + sequence);
    char[] split = sequence.toCharArray();
    for (int i = 0; i < split.length; i++) {
        char c = split[i];
        if (!types.containsKey(c)) {
            throw new DataException("Bad sequence character in " + sequence + ": " + c);
        }
        switch (c) {
            case 'b':
                data.add(buf.readByte());
                break;
            case 'f':
                data.add(buf.readFloat());
                break;
            case 'd':
                data.add(buf.readDouble());
                break;
            case 's':
                data.add(buf.readShort());
                break;
            case 'i':
                data.add(buf.readInt());
                break;
            case 'l':
                data.add(buf.readLong());
                break;
            case 'c':
                data.add(buf.readChar());
                break;
            case 'S':
                data.add(Packet.readString(buf));
                break;
            case 'B':
                data.add(buf.readBoolean());
                break;
        }
    }
}

@Override
public void write(ByteBuf buf) {
    StringBuilder sequence = new StringBuilder();
    for (Object object : data) {
        sequence.append(getType(object.getClass()));
    }
    Packet.writeString(buf, sequence.toString());
    for (Object object : data) {
        switch (getType(object.getClass())) {
            case 'b':
                buf.writeByte((Byte) object);
                break;
            case 'f':
                buf.writeFloat((Float) object);
                break;
            case 'd':
                buf.writeDouble((Double) object);
                break;
            case 's':
                buf.writeShort((Short) object);
                break;
            case 'i':
                buf.writeInt((Integer) object);
                break;
            case 'l':
                buf.writeLong((Long) object);
                break;
            case 'c':
                buf.writeChar((Character) object);
                break;
            case 'S':
                Packet.writeString(buf, (String) object);
                break;
            case 'B':
                buf.writeBoolean((Boolean) object);
                break;
        }
    }
}

@Override
public int getId() {
    return 0;
}

public SequencePacket write(Object o) {
    if (!types.containsValue(o.getClass())) {
        throw new DataException("Cannot add object type to sequence: " + o.getClass().getSimpleName());
    }
    data.add(o);
    return this;
}

public byte getByte(int index) {
    return (Byte) data.get(index);
}

public float getFloat(int index) {
    return (Float) data.get(index);
}

public double getDouble(int index) {
    return (Double) data.get(index);
}

public short getShort(int index) {
    return (Short) data.get(index);
}

public int getInt(int index) {
    return (Integer) data.get(index);
}

public long getLong(int index) {
    return (Long) data.get(index);
}

public char getChar(int index) {
    return (Character) data.get(index);
}

public String getString(int index) {
    return data.get(index).toString();
}

public boolean getBoolean(int index) {
    return (Boolean) data.get(index);
}

public Object getObject(int index) {
    return data.get(index);
}

public boolean isByte(int index) {
    return data.get(index).getClass() == Byte.class;
}

public boolean isFloat(int index) {
    return data.get(index).getClass() == Float.class;
}

public boolean isDouble(int index) {
    return data.get(index).getClass() == Double.class;
}

public boolean isShort(int index) {
    return data.get(index).getClass() == Short.class;
}

public boolean isInt(int index) {
    return data.get(index).getClass() == Integer.class;
}

public boolean isLong(int index) {
    return data.get(index).getClass() == Long.class;
}

public boolean isChar(int index) {
    return data.get(index).getClass() == Character.class;
}

public boolean isString(int index) {
    return data.get(index).getClass() == String.class;
}

public boolean isBoolean(int index) {
    return data.get(index).getClass() == Boolean.class;
}

public List<Object> getAll() {
    return data;
}

public int size() {
    return data.size();
}

public boolean isEmpty() {
    return data.isEmpty();
}

public SequencePacket clear() {
    data.clear();
    return this;
}

public boolean hasIndex(int index) {
    return index >= 0 && index < data.size();
}

public Class<? extends Object> getClass(int index) {
    return data.get(index).getClass();
}

private char getType(Class<? extends Object> clazz) {
    char c = ' ';
    for (Entry<Character, Class<? extends Object>> entry : types.entrySet()) {
        if (entry.getValue() == clazz) {
            c = entry.getKey();
            break;
        }
    }
    if (c == ' ') {
        throw new DataException("Could not find type in sequence: " + clazz.getSimpleName());
    }
    return c;
}

@Override
public String toString() {
    StringBuilder result = new StringBuilder();
    result.append("{");
    if (data != null) {
        for (Object object : data) {
            result.append(object.toString());
            result.append(", ");
        }
        if (result.length() > 2) {
            result.setLength(result.length() - 2);
        }
    }
    result.append("}");
    return result.toString();
}

}

Upvotes: 0

Views: 7197

Answers (1)

Yazan Jaber
Yazan Jaber

Reputation: 2078

Whenever the channel is inactive decode will be called this is the default behavior inherited from ByteToMessageDecoder, to solve this you'll have to check for empty buffer in PacketDecoder.decode and if the buffer is empty simply return.

Exception 2 is happening because in your code here is the logic:

  1. Create a SimplePacket
  2. Make SimplePacket read one int from buffer
  3. if there is a data remaining in the buffer throw exception
  4. if no data remaining add packet to out list

And since your sending two SimplePacket from the client the exception will surly be thrown. to fix this you have to create SimplePacket in the server in a loop:

while(buf.readableBytes() != 0){
    Packet packet = new SimplePacket();
    packet.read(buf);
    out.add(packet);
}

In TCP packets sent will be reliably sent and will be received in same order but not necessary in same chunk sizes as they were sent, if you don't start thinking in this way you'll keep introducing bugs in the code. the exception, your last I hope :), is probably caused by the fact that you start reading bytes off the buffer in the code below with no regard of how much bytes are actually available. it is not enough to check if the buffer has data, you'll also need to check if it has enough data. And until it has enough data you don't read anything you just let the data accumulate in the buffer until the size of the buffer is greater or equal to the least amount of parse-able bytes in your protocol.

while (buf.readableBytes() != 0) {
            int id = buf.readInt();
            if (packetManager.isRegistered(id)) {
                Packet packet = packetManager.getPacket(id);
                packet.read(buf);
                out.add(packet);
                } else {
                buf.skipBytes(buf.readableBytes());
                throw new DataException("Cannot receive unregistered packet: " + id);
            }
        }

Upvotes: 2

Related Questions