Saboteur Kid
Saboteur Kid

Reputation: 85

Netty Channel fail when write and flush too many and too fast

When I write a producer to publish message to my server. I've seen this:

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384)
at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:447)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)

I've searched all around and was told that because of channel is closed. But, in my code. I'm just close my channel when my channel pool destroy the channel.
Here my code:

public static class ChannelFactory implements PoolableObjectFactory<Channel> {

    private final Bootstrap bootstrap;
    private String host;
    private int port;

    public ChannelFactory(Bootstrap bootstrap, String host, int port) {
        this.bootstrap = bootstrap;
        this.host = host;
        this.port = port;
    }

    @Override
    public Channel makeObject() throws Exception {
        System.out.println("Create new channel!!!");
        bootstrap.validate();
        return bootstrap.connect(host, port).channel();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        ChannelFuture close = channel.close();
        if (close.isSuccess()) {
            System.out.println(channel + " close successfully");
        }
    }

    @Override
    public boolean validateObject(Channel channel) {
        System.out.println("Validate object");
        return (channel.isOpen());
    }

    @Override
    public void activateObject(Channel channel) throws Exception {
        System.out.println(channel + " is activated");
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        System.out.println(channel + " is passivated");
    }

    /**
     * @return the host
     */
    public String getHost() {
        return host;
    }

    /**
     * @param host the host to set
     * @return
     */
    public ChannelFactory setHost(String host) {
        this.host = host;
        return this;
    }

    /**
     * @return the port
     */
    public int getPort() {
        return port;
    }

    /**
     * @param port the port to set
     * @return
     */
    public ChannelFactory setPort(int port) {
        this.port = port;
        return this;
    }

}

And here is my Runner:

public static class Runner implements Runnable {

    private Channel channel;
    private ButtyMessage message;
    private MyChannelPool channelPool;

    public Runner(MyChannelPool channelPool, Channel channel, ButtyMessage message) {
        this.channel = channel;
        this.message = message;
        this.channelPool = channelPool;
    }

    @Override
    public void run() {
        channel.writeAndFlush(message.content()).syncUninterruptibly().addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                channelPool.returnObject(future.channel());
            }
        });
    }

}

And my main:

public static void main(String[] args) throws InterruptedException {

    final String host = "127.0.0.1";
    final int port = 8080;
    int jobSize = 100;
    int jobNumber = 10000;
    final Bootstrap b = func(host, port);

    final MyChannelPool channelPool = new MyChannelPool(new ChannelFactory(b, host, port));

    ExecutorService threadPool = Executors.newFixedThreadPool(1);
    for (int i = 0; i < jobNumber; i++) {
        try {
            threadPool.execute(new Runner(channelPool, channelPool.borrowObject(), new ButtyMessage()));
        } catch (Exception ex) {
            System.out.println("ex = " + ex.getMessage());
        }
    }
}

With ButtyMessage extends ByteBufHolder.
In my Runner class, if I sleep(10) after writeAndFlush it run quite OK. But I don't want to reply on sleep. So I use ChannelFutureListener, but the result is bad. If I send about 1000 to 10.000 messages, it will crash and throw exception above. Is there any way to avoid this?

Thanks all.
Sorry for my bad explain and my English :)

Upvotes: 0

Views: 5669

Answers (2)

Saboteur Kid
Saboteur Kid

Reputation: 85

Finally, I've found a solution for myself. But, I'm still think about another solution. (this solution is exactly copy from 4.0.28 netty release note)

final String host = "127.0.0.1";
    final int port = 8080;
    int jobNumber = 100000;
    final EventLoopGroup group = new NioEventLoopGroup(100);

    ChannelPoolMap<InetSocketAddress, MyChannelPool> poolMap = new AbstractChannelPoolMap<InetSocketAddress, MyChannelPool>() {

        @Override
        protected MyChannelPool newPool(InetSocketAddress key) {
            Bootstrap bootstrap = func(group, key.getHostName(), key.getPort());
            return new MyChannelPool(bootstrap, new _AbstractChannelPoolHandler());
        }
    };

    ChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap1 = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {

        @Override
        protected FixedChannelPool newPool(InetSocketAddress key) {
            Bootstrap bootstrap = func(group, key.getHostName(), key.getPort());
            return new FixedChannelPool(bootstrap, new _AbstractChannelPoolHandler(), 10);
        }
    };

    final ChannelPool myChannelPool = poolMap.get(new InetSocketAddress(host, port));
    final CountDownLatch latch = new CountDownLatch(jobNumber);

    for (int i = 0; i < jobNumber; i++) {
        final int counter = i;
        final Future<Channel> future = myChannelPool.acquire();
        future.addListener(new FutureListener<Channel>() {
            @Override
            public void operationComplete(Future<Channel> f) {
                if (f.isSuccess()) {
                    Channel ch = f.getNow();
                    // Do somethings
                    ch.writeAndFlush(new ButtyMessage().content()).addListener(new ChannelFutureListener() {

                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isSuccess()) {
                                System.out.println("counter = " + counter);
                                System.out.println("future = " + future.channel());
                                latch.countDown();
                            }
                        }
                    });
                    // Release back to pool
                    myChannelPool.release(ch);

                } else {
                    System.out.println(f.cause().getMessage());
                    f.cause().printStackTrace();
                }
            }
        });
    }
    try {
        latch.await();
        System.exit(0);
    } catch (InterruptedException ex) {
        System.out.println("ex = " + ex.getMessage());
    }

As you can see, I use SimpleChannelPool and FixedChannelPool (an implementation of SimpleChannelPool provided by netty).
What it can do:
SimpleChannelPool: open channels as much as it need ---> if you has 100.000 msg -> cuz error, of course. Many socket open, then IOExeption: Too many file open occur. (is that really pool? Create as much as possible and throw exception? I don't call this is pooling)
FixedChannelPool: not work in my case (Still study why? =)) Sorry for my stupidness) Indeed, I want to use ObjectPool instead. And I may post it as soon as when I finish. Tks @Frederic Brégier for helping me so much!

Upvotes: 0

Frederic Br&#233;gier
Frederic Br&#233;gier

Reputation: 2206

You have several issues that could explain this. Most of them are related to wrong usage of asynchronous operations and future usage.

  • I don't know if it could be in link with your issue but, if you really want to print when the channel is really closed, you have to wait on the future, since the future on close() (or any other operations) immediately returns, without waiting for the real close. Therefore your test if (close.isSuccess()) shall be always false.

    public void destroyObject(final Channel channel) throws Exception {
       channel.close().addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture close) {
            if (close.isSuccess()) {
               System.out.println(channel + " close successfully");
            }
          }
       });
    }
    

However, as I suppose it is only for debug purpose, it is not mandatory.

  • Another one: you send back to your pool a channel that is not already connected (which could explain your sleep(10) maybe?). You have to wait on the connect().

    public Channel makeObject() throws Exception {
       System.out.println("Create new channel!!!");
       //bootstrap.validate(); // this is implicitely called in connect()
       ChannelFuture future = bootstrap.connect(host, port).awaitUninterruptibly();
       if (future.isSuccess()) {
         return future.channel();
       } else {
         // do what you need to do when the connection is not done
       }
    }
    
  • third one: validation of a connected channel might be better using isActive():

    @Override
    public boolean validateObject(Channel channel) {
        System.out.println("Validate object");
        return channel.isActive(); // instead of isOpen()
    }
    
  • fourth one: in your runner, you wrongly await on the future while you should not. You can remove your syncUninterruptibly() and let the rest as is.

    @Override
    public void run() {
       Channel.writeAndFlush(message.content()).addListener(new ChannelFutureListener() {
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
            channelPool.returnObject(future.channel());
         }
       });
    }
    
  • And finally, I suppose you know your test is completely sequential (1 thread in your pool), such that each client will reuse over and over the very same channel?

Could you try to change the 4 points to see if it corrects your issue?


EDIT: after requester comment

For syncUntinterruptibly(), I did not read carefully. If you want to block on write, then you don't need the extra addListener since the future is done once the sync is over. So you can directly call your channelPool.returnObject as next command just after your sync.

So you should write it this way, simpler.

    @Override
    public void run() {
       Channel.writeAndFlush(message.content()).syncUntinterruptibly();
       channelPool.returnObject(future.channel());
    }

For fireChannelActive, it will be called as soon as the connect finished (so from makeObject, sometime in the future). Moreover, once disconnected (as you did have notice in your exception), the channel is no more usable and must be recreated from zero. So I would suggest to use isActive however, such that, if not active, it will be removed using destroyObject...

Take a look at the channel state model here.

Upvotes: 1

Related Questions