Reputation: 13
I wrote a server to send large number of message to all clients after connecting.
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
while (true){
content = arrayblockqueue.poll()
ctx.writeAndFlush(content+"\r\n");
}
}
after sending thousands of message , the channel do not send message anymore. through debugging, I found that AbstractNioByteChannel.incompleteWrite was invoked and the selectionKey will be add SelectionKey.OP_WRITE when the network is congestion. After being set OP_WRITE , AbstractNioUnsafe.isFlushPending() will return true so the flush() can not be done indeed . How to let netty recover this situation ? Or I use netty in a wrong way ?
Upvotes: 1
Views: 996
Reputation: 12351
Your handler method is invoked directly from an I/O thread. Until your handler method returns, the I/O thread which called the handler method cannot perform any I/O, and that's why you are not seeing anything is written.
Looking from your code, what you want is to get a message from a blocking queue and write it to a channel. Instead of using a blocking queue, you can just write to the channel. Almost all operations in Netty are thread safe. For example:
public static void main(String[] args) throws Exception {
...
Channel ch = ...;
for (int i = 0; i < 1000000; i ++) {
ch.writeAndFlush(String.valueOf(i) + "\r\n");
}
...
}
// And your handler doesn't need an arrayblockingqueue.
However, the code above will probably make the event queue of Netty grow infinitely, resulting OutOfMemoryError
. To prevent the write requests from being queued infinitely, you have to use the future returned by the writeAndFlush()
operation.
for (int i = 0; i < 1000000; i ++) {
ChannelFuture f = ch.writeAndFlush(String.valueOf(i) + "\r\n");
if ((i + 1) % 100 == 0) {
// Wait until the write request is actually finished
// so that the event queue becomes empty.
f.sync();
}
}
Upvotes: 2