user3149497
user3149497

Reputation: 67

Tcp server WritePendingException although thread locks

I've written a simple asynchronous tcp server with the nio. The server should be able to read and write at the same time for each client. This I've implemented with a simple packet queue.

public class TcpJobHandler {

private BlockingQueue<TcpJob> _packetQueue = new LinkedBlockingQueue<TcpJob>();
private Thread _jobThread;
private final ReentrantLock _lock = new ReentrantLock();

public TcpJobHandler(){
    _jobThread = new Thread(new Runnable() {
        @Override
        public void run() {
            jobLoop();
        }       
    });

    _jobThread.start();
}

private void jobLoop(){
    while(true){
        try {
            _lock.lock();
            TcpJob job = _packetQueue.take();
            if(job == null){
                continue;
            }
            job.execute();
        } catch (Exception e) {
            AppLogger.error("Failed to dequeue packet from job queue.", e);
        }finally{
            _lock.unlock();
        }
    }
}

public void insertJob(TcpJob job){
    try{
        _packetQueue.put(job);
    }catch(InterruptedException e){
        AppLogger.error("Failed to queue packet to the tcp job queue.", e);
    }
}
}

What this code do, is just checking for a new packet. If a new packet is available, this packet will be send to the client. In the class tcp job, there is just the packet to send and a write class whichs writes the packet into the client stream. As you can see, only one thread should be able to write a packet into a client stream.

This is the point, why I dont understand, why I'm getting this error? If I'm right, this exception says, that I try to send data into a stream, but there is already a thread which is writing data into this stream. But why?

//Edit: I'm getting this exception:

19:18:41.468 [ERROR] - [mufisync.server.data.tcp.handler.TcpJobHandler] : Failed to dequeue packet from job queue. Exception: java.nio.channels.WritePendingException
at sun.nio.ch.AsynchronousSocketChannelImpl.write(Unknown Source)
at sun.nio.ch.AsynchronousSocketChannelImpl.write(Unknown Source)
at mufisync.server.data.tcp.stream.OutputStreamAdapter.write(OutputStreamAdapter.java:35)
at mufisync.server.data.tcp.stream.OutputStreamAdapter.write(OutputStreamAdapter.java:26)
at mufisync.server.data.tcp.stream.BinaryWriter.write(BinaryWriter.java:21)
at mufisync.server.data.tcp.TcpJob.execute(TcpJob.java:29)
at mufisync.server.data.tcp.handler.TcpJobHandler.jobLoop(TcpJobHandler.java:40)
at mufisync.server.data.tcp.handler.TcpJobHandler.access$0(TcpJobHandler.java:32)
at mufisync.server.data.tcp.handler.TcpJobHandler$1.run(TcpJobHandler.java:25)
at java.lang.Thread.run(Unknown Source)

The TcpJob looks like this:

public class TcpJob {

private BasePacket _packet;
private BinaryWriter _writer;

public TcpJob(BasePacket packet, BinaryWriter writer){
    _packet = packet;
    _writer = writer;
}

public void execute(){
    try {
        if(_packet == null){
            AppLogger.warn("Tcp job packet is null");
            return;
        }

        _writer.write(_packet.toByteArray());
    } catch (IOException e) {
        AppLogger.error("Failed to write packet into the stream.", e);
    }
}

public BasePacket get_packet() {
    return _packet;
}   
}

The BinaryStream is just coupled to a AsynchronousSocketChannel which calls the write(byte[]) method from the socket channel.

Upvotes: 1

Views: 448

Answers (1)

Peter Lawrey
Peter Lawrey

Reputation: 533690

You are using asynchronous NIO2. When you use asynchronous IO you cannot call write() until the last write has completed. From the Javadoc

 * @throws  WritePendingException
 *          If a write operation is already in progress on this channel

e.g. if you have used

public abstract Future<Integer> write(ByteBuffer src);

you cannot write again until this Future.get() returns.

If you use

public abstract <A> void write(ByteBuffer src,
                               long timeout,
                               TimeUnit unit,
                               A attachment,
                               CompletionHandler<Integer,? super A> handler);

You cannot write again until the CompletionHandler is called.

Note: you cannot be performing two reads at once either.

In your case you want something like

ByteBuffer lastBuffer = null;
Future<Integer> future = null;

public void execute(){
    try {
        if(_packet == null){
            AppLogger.warn("Tcp job packet is null");
            return;
        }
        // need to wait until the last buffer was written completely.
        while (future != null) {
           future.get();
           if (lastBuffer.remaining() > 0)
              future = _writer.write(lasBuffer);
           else
              break;
        }
        // start another write.
        future = _writer.write(lastBuffer = _packet.toByteArray());
    } catch (IOException e) {
        AppLogger.error("Failed to write packet into the stream.", e);
    }
}

Upvotes: 2

Related Questions