Reputation: 1195
When I run the below locally (on my own computer) it works fine - I can send messages to it and it reads them in properly. As soon as I put this on a remote server and send a message, only half the message gets read.
try {
this.asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(80));
this.asynchronousServerSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Void att) {
try {
asynchronousServerSocketChannel.accept(null, this);
ByteBuffer byteBuffer = ByteBuffer.allocate(10485760);
asynchronousSocketChannel.read(byteBuffer).get(120000, TimeUnit.SECONDS);
byteBuffer.flip();
System.out.println("request: " + Charset.defaultCharset().decode(byteBuffer).toString());
} catch (CorruptHeadersException | CorruptProtocolException | MalformedURLException ex) {
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
}
}
@Override
public void failed(Throwable exc, Void att) {
}
});
} catch (IOException ex) {
}
I've looked around at other questions and tried some of the answers but nothing worked so far. I thought the cause might be that it's timing out due to it being slower over the network when it's placed remotely but increasing the timeout didn't resolve the issue. I also considered that the message might be too large but allocating more capacity to the ByteBuffer
didn't resolve the issue either.
Upvotes: 0
Views: 1691
Reputation: 21
You can utilize lambda functions when receiving data from asynchronous socket. Herein the read completion handler is created using standalone method newReadHandler(...)
with lambda functions as parameters. The asynchronous reading is started with readLambda.accept(handler);
statement placed at the end of the following Java code. The completed(...)
callback method is invoked asynchronously so that algorithm doesn't collect function calls on the stack.
private CompletionHandler<Integer, LogConsole> newReadHandler(
BiFunction<Integer, LogConsole, Boolean> completedLambda,
BiConsumer<Throwable, LogConsole> failedLambda,
Consumer<CompletionHandler<Integer, LogConsole>> nextReadLambda) {
CompletionHandler<Integer, LogConsole> completionHandler = new CompletionHandler<Integer, LogConsole>() {
@Override
public void completed(Integer result, LogConsole console) {
if (!completedLambda.apply(result, console))
return;
nextReadLambda.accept(newReadHandler(completedLambda, failedLambda, nextReadLambda));
}
@Override
public void failed(Throwable exception, LogConsole console) {
failedLambda.accept(exception, console);
}
};
return completionHandler;
}
public readFromClientSocket(AsynchronousSocketChannel socket) {
BiFunction<Integer, LogConsole, Boolean> completedLambda = (readResult, readConsole) -> {
// Accept successful read operation.
};
BiConsumer<Throwable, LogConsole> failedLambda = (readException, readConsole) -> {
// Read exception.
};
Consumer<CompletionHandler<Integer, LogConsole>> readLambda = (handler) -> {
// Read operation.
socket.read(console.inputBuffer, 10, TimeUnit.SECONDS, console, handler);
};
// Create completion handler.
CompletionHandler<Integer, LogConsole> handler = newReadHandler(completedLambda, failedLambda, readLambda);
// Run first read operation with firt completion handler.
readLambda.accept(handler);
}
Upvotes: 1
Reputation: 363
I believe your issue is with the Asynchronous nature of the code you're using. What you have is an open connection and you've called the asynchronous read
method on your socket.
This reads n
bytes from the channel where n
is anything from 0
to the size of your available buffer.
I firmly believe that you have to read in a loop. That is, with Java's A-NIO; you'd need to call read
again from your completed
method on your CompletionHandler
by, possibly, passing in the AsynchronousSocketChannel
as an attachment to a new completed
method on a CompletionHandler
you create for read
, not the one you already have for accept
methods.
I think this is the same sort of pattern you'd use where you'd call accept
again with this
as the completion handler from your completed
method in the CompletionHandler
you're using for the accept
method call.
It then becomes important to put an "Escape" clause into your CompletionHandler
for instance, if the result
is -1
or if the ByteBuffer
had read X
number of bytes based on what you're expecting, or based on if the final byte
in the ByteBuffer
is a specific message termination byte that you've agreed with the sending application.
The Java Documentation on the matter goes so far as to say the read
method will only read the amount of bytes on the dst
at the time of invocation.
In Summary; the completed
method call for the handler for the read
seems to execute once something was written to the channel; but if something is being streamed you could get half of the bytes, so you'd need to continue reading until you're satisfied you've got the end of what they were sending.
Below is some code I knocked together on reading until the end, responding whilst reading, asynchronously. It, unlike myself, can talk and listen at the same time.
public class ReadForeverCompletionHandler implements CompletionHandler<Integer, Pair<AsynchronousSocketChannel, ByteBuffer>> {
@Override
public void completed(Integer bytesRead, Pair<AsynchronousSocketChannel, ByteBuffer> statefulStuff) {
if(bytesRead != -1) {
final ByteBuffer receivedByteBuffer = statefulStuff.getRight();
final AsynchronousSocketChannel theSocketChannel = statefulStuff.getLeft();
if (receivedByteBuffer.position()>8) {
//New buffer as existing buffer is in use
ByteBuffer response = ByteBuffer.wrap(receivedByteBuffer.array());
receivedByteBuffer.clear(); //safe as we've not got any outstanding or in progress reads, yet.
theSocketChannel.read(receivedByteBuffer,statefulStuff,this); //Basically "WAIT" on more data
Future<Integer> ignoredBytesWrittenResult = theSocketChannel.write(response);
}
}
else {
//connection was closed code
try {
statefulStuff.getLeft().shutdownOutput(); //maybe
}
catch (IOException somethingBad){
//fire
}
}
}
@Override
public void failed(Throwable exc, Pair<AsynchronousSocketChannel, ByteBuffer> attachment) {
//shout fire
}
The read is originally kicked off by a call from the completed
method in the handler from the very original asynchronous accept
on the server socket like
public class AcceptForeverCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Pair<AsynchronousServerSocketChannel, Collection<AsynchronousSocketChannel>>> {
private final ReadForeverCompletionHandler readForeverAndEverAndSoOn = new ReadForeverCompletionHandler();
@Override
public void completed(AsynchronousSocketChannel result, Pair<AsynchronousServerSocketChannel, Collection<AsynchronousSocketChannel>> statefulStuff) {
statefulStuff.getLeft().accept(statefulStuff, this); //Accept more new connections please as we go
statefulStuff.getRight().add(result); //Collect these in case we want to for some reason, I don't know
ByteBuffer buffer = ByteBuffer.allocate(4098); //4k seems a nice number
result.read(buffer, Pair.of(result, buffer ),readForeverAndEverAndSoOn); //Kick off the read "forever"
}
@Override
public void failed(Throwable exc, Pair<AsynchronousServerSocketChannel, Collection<AsynchronousSocketChannel>> attachment) {
//Shout fire
}
}
Upvotes: 0