Reputation: 55
We wrote an incoming reactor that works this way:
The shutting down procedure of the reactor is iterating over the selector.keys()
and for each of them closing the corresponding channel and cancelling the key.
We wrote the following unit test for the shutdown procedure:
The test causes ConcurrentModificationException pointing to the loop iterating over the sockets and closes them (which was in the main thread context).
Our assumption is that when a Sender read method got -1, it closed the socket and somehow it woke up the selector select method, The selector then accessed its keys set which was iterated by the shutdown loop and hence the exception.
We worked around this problem by creating a new list with all the keys of the selector. Canceling those keys by iterating this list prevent two objects from modifying the same key's set.
Our question are:
EDIT: Added some code snippets for clarifications (We tried to narrow the code as possible)
IncomingReactor:
public boolean startAcceptingIncomingData() {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open());
serverSocketChannel.bind(new InetSocketAddress(incomingConnectionsPort));
serverSocketChannel.configureBlocking(false);
SelectionKey acceptorSelectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
acceptorSelectionKey.attach((Worker) this::acceptIncomingSocket);
startSelectionLoop(selector);
return true;
}
private boolean acceptIncomingSocket() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
selectionKey.attach(new WorkerImpl() /*Responsible for reading data and tranferring it into a parsing thread*/);
return true;
} catch (IOException e) {
return false;
}
}
private void startSelectionLoop(Selector selector) {
shouldLoop = true;
while (shouldLoop) {
try {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (!shouldLoop) {
break;
}
selectedKeys.forEach((key) -> {
boolean workSuccess = ((Worker) key.attachment()).work();
if (!workSuccess) {
key.channel().close();
key.cancel();
}
});
selectedKeys.clear();
} catch (ClosedSelectorException ignore) {
}
}
}
public void shutDown() {
shouldLoop = false;
selector.keys().forEach(key -> { /***EXCEPTION - This is where the exception points to (this is line 129) ***/
key.channel().close();
key.cancel();
});
try {
selector.close();
} catch (IOException e) {
}
}
UnitTest:
@Test
public void testMaximumConnectionsWithMultipleThreads() {
final int PORT = 24785;
final int MAXINUM_CONNECTIONS = 10;
IncomingReactor incomingReactor = new IncomingReactor(PORT);
Callable<Boolean> acceptorThread = () -> {
incomingReactor.startAcceptingIncomingData();
return true;
};
ExecutorService threadPool = Executors.newFixedThreadPool(MAXIMUM_CONNECTIONS + 1);
Future<Boolean> acceptorFuture = threadPool.submit(acceptorThread);
List<Future<Boolean>> futureList = new ArrayList<>(MAXIMUM_CONNECTIONS);
for (int currentSenderThread = 0; currentSenderThread < MAXIMUM_CONNECTIONS; currentSenderThread++) {
Future<Boolean> senderFuture = threadPool.submit(() -> {
Socket socket = new Socket(LOCALHOST, PORT);
int bytesRead = socket.getInputStream().read();
if (bytesRead == -1) { //The server has closed us
socket.close();
return true;
} else {
throw new RuntimeException("Got real bytes from socket.");
}
});
futureList.add((senderFuture));
}
Thread.sleep(1000); //We should wait to ensure that the evil socket is indeed the last one that connects and the one that will be closed
Socket shouldCloseSocket = new Socket(LOCALHOST, PORT);
Assert.assertEquals(shouldCloseSocket.getInputStream().read(), -1);
shouldCloseSocket.close();
incomingReactor.shutDown();
for (Future<Boolean> senderFuture : futureList) {
senderFuture.get();
}
acceptorFuture.get();
threadPool.shutdown();
}
Exception:
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
at java.util.HashMap$KeyIterator.next(HashMap.java:1461)
at java.lang.Iterable.forEach(Iterable.java:74)
at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080)
at mypackage.IncomingReactor.shutDown(IncomingReactor.java:129)
at mypackage.tests.TestIncomingReactor.testMaximumConnectionsWithMultipleThreads(TestIncomingReactor.java:177)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:85)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:659)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:845)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1153)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:108)
at org.testng.TestRunner.privateRun(TestRunner.java:771)
at org.testng.TestRunner.run(TestRunner.java:621)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:357)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:352)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:310)
at org.testng.SuiteRunner.run(SuiteRunner.java:259)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1199)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1124)
at org.testng.TestNG.run(TestNG.java:1032)
at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:74)
at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Upvotes: 1
Views: 709
Reputation: 1797
You cannot modify the selector.keys()
Set<SelectionKey>
from inside of the for loop because that Set
is not capable of concurrent modification. (calling channel.close()
will modify the Set
from inside the loop reading the Set
)
https://docs.oracle.com/javase/7/docs/api/java/util/HashSet.html
The iterators returned by this class's iterator method are fail-fast: if the set is modified at any time after the iterator is created, in any way except through the iterator's own remove method, the Iterator throws a ConcurrentModificationException. Thus, in the face of concurrent modification, the iterator fails quickly and cleanly, rather than risking arbitrary, non-deterministic behavior at an undetermined time in the future.
SelectionKey[] keys = selector.keys().toArray(new SelectionKey[0]);
for( SelectionKey k : keys )
{
try
{
k.channel().close();
}
catch(Throwable x )
{
// print
}
}
try
{
selector.close();
}
catch(IoException e )
{
// print
}
Upvotes: 1
Reputation: 311023
The shutting down procedure of the reactor is iterating over the selector.keys() and for each of them closing the corresponding channel and cancelling the key.
It should start by stopping the selector loop. NB Closing the channel cancels the key. You don't have to cancel it yourself.
We wrote the following unit test for the shutdown procedure:
Open a reactor thread running the selction loop. Open several Sender threads. Each opens a socket to the reactor and reads. The read blocks until it gets -1 (meaning the reactor closed the socket).
The reactor closed its accepted socket. Your client socket remained open.
After the read returns -1, the sender closes the socket and finishes.
I hope this means the sender closed its client socket.
The test causes
ConcurrentModificationException
pointing to the loop iterating over the sockets and closes them (which was in the main thread context).
Really? I don't see any stack trace in your question.
Our assumption is that when a Sender read method got -1, it closed the socket and somehow it woke up the selector select method
Not possible unless the reactor didn't close the channel, in which case you wouldn't have got -1 from read etc.
The selector then accessed its keys set which was iterated by the shutdown loop and hence the exception.
The exception is caused by modifying the key set during iteration. Bug in your server code.
We worked around this problem by creating a new list with all the keys of the selector. Canceling those keys by iterating this list prevent two objects from modifying the same key's set.
You need to fix the actual problem, and for that you need to post the actual code.
Our question are:
Is our assumption correct? When the client socket calls the close method- does it really wake up the selector?
Not unless the selector-end channel is still open.
Does the creation of a new list is the appropriate solution or is it just a work-around?
It is just a nasty workaround for a problem you haven't identified yet.
Upvotes: 2