alessandro ferrucci
alessandro ferrucci

Reputation: 1289

Netty client sometimes doesn't receive all expected messages

I have a fairly simple test Netty server/client project . I am testing some aspects of the stability of the communication by flooding the server with messages and counting the messages and bytes that I get back to make sure that everything matches.

When I run the flood from the client, the client keeps track of the number of messages it sends and how many it gets back and then when the number equal to each other it prints out some stats.

On certain occassions when running locally (I'm guessing because of congestion?) the client never ends up printing out the final message. I haven't run into this issue when the 2 components are on remote machines. Any suggestions would be appreciated:

The Encoder is just a simple OneToOneEncoder that encodes an Envelope type to a ChannelBuffer and the Decoder is a simple ReplayDecoder that does the opposite.

I tried adding a ChannelInterestChanged method to my client handler to see if the channel's interest was getting changed to not read, but that did not seem to be the case.

The relevant code is below:

Thanks!

SERVER

    public class Server {

    // configuration --------------------------------------------------------------------------------------------------
    private final int port;
    private ServerChannelFactory serverFactory;
    // constructors ---------------------------------------------------------------------------------------------------

    public Server(int port) {
        this.port = port;
    }


    // public methods -------------------------------------------------------------------------------------------------
    public boolean start() {
        ExecutorService bossThreadPool = Executors.newCachedThreadPool();
        ExecutorService childThreadPool = Executors.newCachedThreadPool();

        this.serverFactory = new NioServerSocketChannelFactory(bossThreadPool, childThreadPool);
        this.channelGroup = new DeviceIdAwareChannelGroup(this + "-channelGroup");
        ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("encoder", Encoder.getInstance());
                pipeline.addLast("decoder", new Decoder());
                pipeline.addLast("handler", new ServerHandler());
                return pipeline;
            }
        };

        ServerBootstrap bootstrap = new ServerBootstrap(this.serverFactory);
        bootstrap.setOption("reuseAddress", true);
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);
        bootstrap.setPipelineFactory(pipelineFactory);

        Channel channel = bootstrap.bind(new InetSocketAddress(this.port));
        if (!channel.isBound()) {
            this.stop();
            return false;
        }

        this.channelGroup.add(channel);
        return true;
    }

    public void stop() {
        if (this.channelGroup != null) {
            ChannelGroupFuture channelGroupCloseFuture = this.channelGroup.close();
            System.out.println("waiting for ChannelGroup shutdown...");
            channelGroupCloseFuture.awaitUninterruptibly();
        }
        if (this.serverFactory != null) {
            this.serverFactory.releaseExternalResources();
        }
    }

    // main -----------------------------------------------------------------------------------------------------------
    public static void main(String[] args) {
        int port;
        if (args.length != 3) {
            System.out.println("No arguments found using default values");
            port = 9999;
        } else {
            port = Integer.parseInt(args[1]);
        }

        final Server server = new Server( port);

        if (!server.start()) {
            System.exit(-1);
        }
        System.out.println("Server started on port 9999 ... ");
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                server.stop();
            }
        });
    }
}

SERVER HANDLER

 public class ServerHandler extends SimpleChannelUpstreamHandler {

    // internal vars --------------------------------------------------------------------------------------------------

    private AtomicInteger numMessagesReceived=new AtomicInteger(0);

    // constructors ---------------------------------------------------------------------------------------------------
    public ServerHandler() {
    }

    // SimpleChannelUpstreamHandler -----------------------------------------------------------------------------------
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        Channel c = e.getChannel();
        System.out.println("ChannelConnected: channel id: " + c.getId() + ", remote host: " + c.getRemoteAddress() + ", isChannelConnected(): " + c.isConnected());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        System.out.println("*** EXCEPTION CAUGHT!!! ***");
        e.getChannel().close();
    }

    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelDisconnected(ctx, e);
        System.out.println("*** CHANNEL DISCONNECTED ***");

    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        if(numMessagesReceived.incrementAndGet()%1000==0 ){
             System.out.println("["+numMessagesReceived+"-TH MSG]: Received message: " + e.getMessage());
        }

        if (e.getMessage() instanceof Envelope) {
                // echo it...
                if (e.getChannel().isWritable()) {
                    e.getChannel().write(e.getMessage());
                }
        } else {
            super.messageReceived(ctx, e);
        }
    }
}

CLIENT

public class Client implements ClientHandlerListener {

    // configuration --------------------------------------------------------------------------------------------------
    private final String host;
    private final int port;
    private final int messages;
    // internal vars --------------------------------------------------------------------------------------------------
    private ChannelFactory clientFactory;
    private ChannelGroup channelGroup;
    private ClientHandler handler;
    private final AtomicInteger received;
    private long startTime;
    private ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

    // constructors ---------------------------------------------------------------------------------------------------
    public Client(String host, int port, int messages) {
        this.host = host;
        this.port = port;
        this.messages = messages;
        this.received = new AtomicInteger(0);
    }

    // ClientHandlerListener ------------------------------------------------------------------------------------------
    @Override
    public void messageReceived(Envelope message) {
        if (this.received.incrementAndGet() == this.messages) {
            long stopTime = System.currentTimeMillis();
            float timeInSeconds = (stopTime - this.startTime) / 1000f;
            System.err.println("Sent and received " + this.messages + " in " + timeInSeconds + "s");
            System.err.println("That's " + (this.messages / timeInSeconds) + " echoes per second!");
        }
    }

    // public methods -------------------------------------------------------------------------------------------------
    public boolean start() {

        // For production scenarios, use limited sized thread pools
        this.clientFactory = new NioClientSocketChannelFactory(cachedThreadPool, cachedThreadPool);
        this.channelGroup = new DefaultChannelGroup(this + "-channelGroup");
        this.handler = new ClientHandler(this, this.channelGroup);
        ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("byteCounter", new ByteCounter("clientByteCounter"));
                pipeline.addLast("encoder", Encoder.getInstance());
                pipeline.addLast("decoder", new Decoder());
                pipeline.addLast("handler", handler);
                return pipeline;
            }
        };

        ClientBootstrap bootstrap = new ClientBootstrap(this.clientFactory);
        bootstrap.setOption("reuseAddress", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("keepAlive", true);
        bootstrap.setPipelineFactory(pipelineFactory);

        boolean connected = bootstrap.connect(new InetSocketAddress(host, port)).awaitUninterruptibly().isSuccess();
        System.out.println("isConnected: " + connected);
        if (!connected) {
            this.stop();
        }

        return connected;
    }

    public void stop() {
        if (this.channelGroup != null) {
            this.channelGroup.close();
        }
        if (this.clientFactory != null) {
            this.clientFactory.releaseExternalResources();
        }
    }

    public ChannelFuture sendMessage(Envelope env) {
        Channel ch = this.channelGroup.iterator().next();
        ChannelFuture cf = ch.write(env);
        return cf;
    }

    private void flood() {
        if ((this.channelGroup == null) || (this.clientFactory == null)) {
            return;
        }

        System.out.println("sending " + this.messages + " messages");
        this.startTime = System.currentTimeMillis();
        for (int i = 0; i < this.messages; i++) {

            this.handler.sendMessage(new Envelope(Version.VERSION1, Type.REQUEST, 1, new byte[1]));
        }
    }
    // main -----------------------------------------------------------------------------------------------------------

    public static void main(String[] args) throws InterruptedException {
        final Client client = new Client("localhost", 9999, 10000);

        if (!client.start()) {
            System.exit(-1);
            return;
        }
        while (client.channelGroup.size() == 0) {
            Thread.sleep(200);
        }
        System.out.println("Client started...");

        client.flood();


        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.println("shutting down client");
                client.stop();
            }
        });


    }
}

CLIENT HANDLER

public class ClientHandler extends SimpleChannelUpstreamHandler {
    // internal vars --------------------------------------------------------------------------------------------------
    private final ClientHandlerListener listener;
    private final ChannelGroup channelGroup;
    private Channel channel;

    // constructors ---------------------------------------------------------------------------------------------------
    public ClientHandler(ClientHandlerListener listener, ChannelGroup channelGroup) {
        this.listener = listener;
        this.channelGroup = channelGroup;
    }

    // SimpleChannelUpstreamHandler -----------------------------------------------------------------------------------

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        if (e.getMessage() instanceof Envelope) {
            Envelope env = (Envelope) e.getMessage();
            this.listener.messageReceived(env);
        } else {
            System.out.println("NOT ENVELOPE!!");
            super.messageReceived(ctx, e);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        System.out.println("**** CAUGHT EXCEPTION CLOSING CHANNEL ***");
        e.getCause().printStackTrace();
        e.getChannel().close();
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.channel = e.getChannel();
        System.out.println("Server connected, channel id: " + this.channel.getId());
        this.channelGroup.add(e.getChannel());
    }

    // public methods -------------------------------------------------------------------------------------------------
    public void sendMessage(Envelope envelope) {
        if (this.channel != null) {
            this.channel.write(envelope);
        }
    }
}

CLIENT HANDLER LISTENER INTERFACE

public interface ClientHandlerListener {

    void messageReceived(Envelope message);
}

Upvotes: 0

Views: 2933

Answers (1)

johnstlr
johnstlr

Reputation: 1431

Without knowing how big the envelope is on the network I'm going to guess that your problem is that your client writes 10,000 messages without checking if the channel is writable.

Netty 3.x processes network events and writes in a particular fashion. It's possible that your client is writing so much data so fast that Netty isn't getting a chance to process receive events. On the server side this would result in the channel becoming non writable and your handler dropping the reply.

There are a few reasons why you see the problem on localhost but it's probably because the write bandwidth is much higher than your network bandwidth. The client doesn't check if the channel is writable, so over a network your messages are buffered by Netty until the network can catch up (if you wrote significantly more than 10,000 messages you might see an OutOfMemoryError). This acts as a natural break because Netty will suspend writing until the network is ready, allowing it to process incoming data and preventing the server from seeing a channel that's not writable.

The DiscardClientHandler in the discard handler shows how to test if the channel is writable, and how to resume when it becomes writable again. Another option is to have sendMessage return the ChannelFuture associated with the write and, if the channel is not writable after the write, block until the future completes.

Also your server handler should write the message and then check if the channel is writable. If it isn't you should set channel readable to false. Netty will notify ChannelInterestChanged when the channel becomes writable again. Then you can set channel readable to true to resume reading messages.

Upvotes: 1

Related Questions