Reputation: 67
I've written a simple asynchronous tcp server with the nio. The server should be able to read and write at the same time for each client. This I've implemented with a simple packet queue.
public class TcpJobHandler {
private BlockingQueue<TcpJob> _packetQueue = new LinkedBlockingQueue<TcpJob>();
private Thread _jobThread;
private final ReentrantLock _lock = new ReentrantLock();
public TcpJobHandler(){
_jobThread = new Thread(new Runnable() {
@Override
public void run() {
jobLoop();
}
});
_jobThread.start();
}
private void jobLoop(){
while(true){
try {
_lock.lock();
TcpJob job = _packetQueue.take();
if(job == null){
continue;
}
job.execute();
} catch (Exception e) {
AppLogger.error("Failed to dequeue packet from job queue.", e);
}finally{
_lock.unlock();
}
}
}
public void insertJob(TcpJob job){
try{
_packetQueue.put(job);
}catch(InterruptedException e){
AppLogger.error("Failed to queue packet to the tcp job queue.", e);
}
}
}
What this code do, is just checking for a new packet. If a new packet is available, this packet will be send to the client. In the class tcp job, there is just the packet to send and a write class whichs writes the packet into the client stream. As you can see, only one thread should be able to write a packet into a client stream.
This is the point, why I dont understand, why I'm getting this error? If I'm right, this exception says, that I try to send data into a stream, but there is already a thread which is writing data into this stream. But why?
//Edit: I'm getting this exception:
19:18:41.468 [ERROR] - [mufisync.server.data.tcp.handler.TcpJobHandler] : Failed to dequeue packet from job queue. Exception: java.nio.channels.WritePendingException
at sun.nio.ch.AsynchronousSocketChannelImpl.write(Unknown Source)
at sun.nio.ch.AsynchronousSocketChannelImpl.write(Unknown Source)
at mufisync.server.data.tcp.stream.OutputStreamAdapter.write(OutputStreamAdapter.java:35)
at mufisync.server.data.tcp.stream.OutputStreamAdapter.write(OutputStreamAdapter.java:26)
at mufisync.server.data.tcp.stream.BinaryWriter.write(BinaryWriter.java:21)
at mufisync.server.data.tcp.TcpJob.execute(TcpJob.java:29)
at mufisync.server.data.tcp.handler.TcpJobHandler.jobLoop(TcpJobHandler.java:40)
at mufisync.server.data.tcp.handler.TcpJobHandler.access$0(TcpJobHandler.java:32)
at mufisync.server.data.tcp.handler.TcpJobHandler$1.run(TcpJobHandler.java:25)
at java.lang.Thread.run(Unknown Source)
The TcpJob looks like this:
public class TcpJob {
private BasePacket _packet;
private BinaryWriter _writer;
public TcpJob(BasePacket packet, BinaryWriter writer){
_packet = packet;
_writer = writer;
}
public void execute(){
try {
if(_packet == null){
AppLogger.warn("Tcp job packet is null");
return;
}
_writer.write(_packet.toByteArray());
} catch (IOException e) {
AppLogger.error("Failed to write packet into the stream.", e);
}
}
public BasePacket get_packet() {
return _packet;
}
}
The BinaryStream is just coupled to a AsynchronousSocketChannel which calls the write(byte[]) method from the socket channel.
Upvotes: 1
Views: 448
Reputation: 533690
You are using asynchronous NIO2. When you use asynchronous IO you cannot call write() until the last write has completed. From the Javadoc
* @throws WritePendingException
* If a write operation is already in progress on this channel
e.g. if you have used
public abstract Future<Integer> write(ByteBuffer src);
you cannot write again until this Future.get() returns.
If you use
public abstract <A> void write(ByteBuffer src,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler);
You cannot write again until the CompletionHandler is called.
Note: you cannot be performing two reads at once either.
In your case you want something like
ByteBuffer lastBuffer = null;
Future<Integer> future = null;
public void execute(){
try {
if(_packet == null){
AppLogger.warn("Tcp job packet is null");
return;
}
// need to wait until the last buffer was written completely.
while (future != null) {
future.get();
if (lastBuffer.remaining() > 0)
future = _writer.write(lasBuffer);
else
break;
}
// start another write.
future = _writer.write(lastBuffer = _packet.toByteArray());
} catch (IOException e) {
AppLogger.error("Failed to write packet into the stream.", e);
}
}
Upvotes: 2