Reputation: 55
I have an unknown number of peers that I will need to make TCP connections to. I'm running into a few problems and not certain whether my overall approach is correct. My current set up on the client side consists of a Peer Manager that shares its EventLoopGroup and creates clients as needed:
public class PeerManagement
{
public PeerManagement()
{
// this group is shared across all clients
_eventLoopGroup = new NioEventLoopGroup();
_peers = new ConcurrentHashMap<>();
}
public void send(String s, String host)
{
// ensure that the peer exists
setPeer(host);
// look up the peer
Peer requestedPeer = _peers.get(host);
// send the request directly to the peer
requestedPeer.send(s);
}
private synchronized void setPeer(String host)
{
if (!_peers.containsKey(host))
{
// create the Peer using the EventLoopGroup & connect
Peer peer = new Peer();
peer.connect(_eventLoopGroup, host);
// add the peer to the Peer list
_peers.put(host, peer);
}
}
}
The Peer class:
public class Peer
{
private static final int PORT = 6010;
private Bootstrap _bootstrap;
private ChannelFuture _channelFuture;
public boolean connect(EventLoopGroup eventLoopGroup, String host)
{
_bootstrap = new Bootstrap();
_bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>()
{
@Override
public void initChannel(SocketChannel socketChannel) throws Exception
{
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder( 1024,0,4,0,4));
socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
socketChannel.pipeline().addLast("customHandler", new CustomPeerHandler());
}
} );
// hold this for communicating with client
_channelFuture = _bootstrap.connect(host, PORT);
return _channelFuture.syncUninterruptibly().isSuccess();
}
public boolean send(String s)
{
if (_channelFuture.channel().isWritable())
{
// not the best method but String will be replaced by byte[]
ByteBuf buffer = _channelFuture.channel().alloc().buffer();
buffer.writeBytes(s.getBytes());
// NEVER returns true but the message is sent
return _channelFuture.channel().writeAndFlush(buffer).isSuccess();
}
return false;
}
}
If I send the following string "this is a test" then writeAndFlush.isSuccess() is always false but sends the message and then I get the following on the server side:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f |.......... |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 0e 74 68 69 73 20 69 73 20 61 20 74 65 |....this is a te|
|00000010| 73 74 |st |
+--------+-------------------------------------------------+----------------+
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 4278190084 - discarded
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f |.......... |
+--------+-------------------------------------------------+----------------+
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 4278190084 - discarded
Upvotes: 0
Views: 142
Reputation: 518
The reason that writeAndFlush().isSuccess()
returns false
is that, like all outbound commands, writeAndFlush()
is asynchronous. The actual write is done in the channel's event loop thread, and this just hasn't happened yet when you call isSuccess()
in the main thread. If you want to block and wait for the write to complete you could use:
channel.writeAndFlush(msg).sync().isSuccess();
The error you see on the server side is because of this frame that arrives before your "this is a test" message:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ff 00 00 00 00 00 00 00 01 7f |.......... |
+--------+-------------------------------------------------+----------------+
The LengthFieldBasedFrameDecoder
tries to decode the first 4 bytes ff 00 00 00
as the length, which is obviously too large. Do you know what is sending this frame? Could it be your CustomPeerHandler
?
Upvotes: 1