Hans
Hans

Reputation: 55

Netty 4.0: Spawning off, maintaining and communicating with many clients/peers (TCP)

I have an unknown number of peers that I will need to make TCP connections to. I'm running into a few problems and not certain whether my overall approach is correct. My current set up on the client side consists of a Peer Manager that shares its EventLoopGroup and creates clients as needed:

public class PeerManagement
{
  public PeerManagement()
  {
   // this group is shared across all clients
   _eventLoopGroup = new NioEventLoopGroup();
   _peers = new ConcurrentHashMap<>();
  }

  public void send(String s, String host)
  {
   // ensure that the peer exists
   setPeer(host);

   // look up the peer
   Peer requestedPeer = _peers.get(host);

   // send the request directly to the peer
   requestedPeer.send(s);
  }

  private synchronized void setPeer(String host)
  {
    if (!_peers.containsKey(host))
    {
     // create the Peer using the EventLoopGroup & connect
     Peer peer = new Peer();
     peer.connect(_eventLoopGroup, host);
     // add the peer to the Peer list
     _peers.put(host, peer);
    }
  }
}

The Peer class:

public class Peer
{
  private static final int PORT = 6010;

  private Bootstrap _bootstrap;
  private ChannelFuture _channelFuture;

  public boolean connect(EventLoopGroup eventLoopGroup, String host)
  {
    _bootstrap = new Bootstrap();
    _bootstrap.group(eventLoopGroup)
   .channel(NioSocketChannel.class)
   .option(ChannelOption.SO_KEEPALIVE, true)
   .handler(new ChannelInitializer<SocketChannel>()
   {
    @Override
    public void initChannel(SocketChannel socketChannel) throws Exception
    {
        socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder( 1024,0,4,0,4));
        socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
        socketChannel.pipeline().addLast("customHandler", new CustomPeerHandler());
      }
    } );

   // hold this for communicating with client
   _channelFuture = _bootstrap.connect(host, PORT);
   return _channelFuture.syncUninterruptibly().isSuccess();
  }

  public boolean send(String s)
  {
   if (_channelFuture.channel().isWritable())
   {
    // not the best method but String will be replaced by byte[]
    ByteBuf buffer = _channelFuture.channel().alloc().buffer();
    buffer.writeBytes(s.getBytes());

    // NEVER returns true but the message is sent
    return _channelFuture.channel().writeAndFlush(buffer).isSuccess();
   }
   return false;
  }

}

If I send the following string "this is a test" then writeAndFlush.isSuccess() is always false but sends the message and then I get the following on the server side:

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f                   |..........      |
+--------+-------------------------------------------------+----------------+

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 0e 74 68 69 73 20 69 73 20 61 20 74 65 |....this is a te|
|00000010| 73 74                                           |st              |
+--------+-------------------------------------------------+----------------+
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 4278190084 - discarded

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f                   |..........      |
+--------+-------------------------------------------------+----------------+
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 4278190084 - discarded

Upvotes: 0

Views: 142

Answers (1)

Ben Evans
Ben Evans

Reputation: 518

The reason that writeAndFlush().isSuccess() returns false is that, like all outbound commands, writeAndFlush() is asynchronous. The actual write is done in the channel's event loop thread, and this just hasn't happened yet when you call isSuccess() in the main thread. If you want to block and wait for the write to complete you could use:

channel.writeAndFlush(msg).sync().isSuccess();

The error you see on the server side is because of this frame that arrives before your "this is a test" message:

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f                   |..........      |
+--------+-------------------------------------------------+----------------+

The LengthFieldBasedFrameDecoder tries to decode the first 4 bytes ff 00 00 00 as the length, which is obviously too large. Do you know what is sending this frame? Could it be your CustomPeerHandler?

Upvotes: 1

Related Questions