xinglu
xinglu

Reputation: 85

netty: why code in handler can not use future.await()?

every body, hello!

I use netty 3.1 to build a socket dispatch server which transfer socket data to another socket server, so I create a client connect in netty sever handler when first message arrived and wait unit the connect is complete, when next messageRecv event arrives, I just transfer the buffer from server channel to client channel. But I find it is forbidden in handler when using future.await*() operation. If I not use await(), because the connectFuture is sync, there is a chance that when the next message arrive, but the conenct is not complete. I don't konw how to deal the issue.

How can I make sure the client connect is complete before next messageRecv event arrived ?

Right now, I just make a lock to synchronize two code, just like this:

/**
 * server handler
*/
public class ServerChannelHandler extends SimpleChannelUpstreamHandler {

  private static Logger _logger = LoggerFactory.getLogger(cn.szboc.dispatch.server.netty.ServerChannelHandler.class);

  public ServerChannelHandler(ProxyClientFactory clientFactory) {
    this.clientFactory = clientFactory;
  }

/** factory connect another server */
private ProxyClientFactory clientFactory;

/** anotherchannel */
private Channel innerChannel;

private ChannelFuture connectFuture;


private ReentrantLock connectLock = new ReentrantLock();

private Condition notComplete = connectLock.newCondition();

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

    final ChannelBuffer buffer = ((ChannelBuffer) e.getMessage()).copy();
    final Channel outChannel = ctx.getChannel();

    // first connect
    if (connectFuture == null) {

        final ClientChannelHandler cch = new ClientChannelHandler(ctx.getChannel());

        ProxyClient client = clientFactory.retrieveClient();


        connectFuture = client.getConnectChannelFuture();


        connectFuture.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {

                connectLock.lock();
                try {

                    if (future.isSuccess()) {
                        innerChannel = future.getChannel();

                        innerChannel.getPipeline().addLast("clientchannelhandler", cch);
                        innerChannel.write(buffer);
                    } else {

                        Channels.fireExceptionCaught(outChannel, future.getCause());
                    }
                } finally {

                    notComplete.signal();

                    connectLock.unlock();
                }
            }

        });

    } else {

        connectLock.lock();
        try {

            if (!connectFuture.isDone()) {

                if (!notComplete.await(500, TimeUnit.MILLISECONDS)) {

                    throw new Exception("");
                }
            }


            if (connectFuture.isSuccess()) {
                if(innerChannel == null){
                    if (!notComplete.await(500, TimeUnit.MILLISECONDS)) {

                        throw new Exception("");
                    }
                }
                innerChannel.write(buffer);
            } else {

                _logger.error("");
            }

        } finally {
            connectLock.unlock();
        }

    }

}

Upvotes: 1

Views: 2372

Answers (1)

Norman Maurer
Norman Maurer

Reputation: 23567

You can't because you could deadlock netty. You would also block the IO-Worker thread which is a bad thing. The best way to handle your situation is to "queue" the messages until the connect is complete and then dispatch them.

An other solution would be to connect to the proxy client on the "channelOpen(..)" method while set Channel.setReadable(false) before. After the connect is done you would then call Channel.setReadable(true) again so you will get messageEvents processed.

Something like this:

    @Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
        throws Exception {
    // Suspend incoming traffic until connected to the remote host.
    final Channel inboundChannel = e.getChannel();
    inboundChannel.setReadable(false);

    // Start the connection attempt.
    ClientBootstrap cb = new ClientBootstrap(cf);
    cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel()));
    ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));

    outboundChannel = f.getChannel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                // Connection attempt succeeded:
                // Begin to accept incoming traffic.
                inboundChannel.setReadable(true);
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}

See the proxy example for more details [1].

[1] https://github.com/netty/netty/tree/3.2/src/main/java/org/jboss/netty/example/proxy

Upvotes: 1

Related Questions