Reputation: 157
I'm writing some code that will be reading log lines and doing some processing in the background on that data. This processing would probably benifit from parallelization, such as what is offered by the Stream.parallel methods, and I was attempting to use this. This is the code I started out with that works perfectly.
public static void main(String[] args) {
try {
final Socket socket = new Socket(ADDRESS, PORT);
final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
socket.getOutputStream().write(QUERY);
reader.lines().forEach(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}
}
This code connects and prints out all of my data. I would very much like to restructure this code as follows:
public static void main(String[] args) {
try (Socket socket = new Socket(ADDRESS, PORT);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
socket.getOutputStream().write(QUERY);
reader.lines().forEach(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}
}
But sadly this doesn't work. Even worse, going back to the original code, this doesn't even work:
public static void main(String[] args) {
try {
final Socket socket = new Socket(ADDRESS, PORT);
final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
socket.getOutputStream().write(QUERY);
reader.lines().parallel().forEach(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}
}
All that was added here was a .parallel call and this completely doesn't work. It just sits there and nothing is printed out.
I can live perfectly well and good without the 2nd version using the modified try(A a = new A()) {}
as that doesn't look too good in this case. What I can't live without is figuring out why this .parallel call breaks everything.
I'm assuming the modified try statement is closing the streams as soon as I fall out of them (right after we start the forEach) so they are being killed and GC'd before operation. I can't for the life of me figure out what the hell is going on with the .parallel call.
As requested here is the output of jstack running on the .parellel() version of this code.
Full thread dump OpenJDK 64-Bit Server VM (25.112-b15 mixed mode):
"Attach Listener" #9 daemon prio=9 os_prio=0 tid=0x00007fd4f4001000 nid=0x4907 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007fd5280be000 nid=0x48d2 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007fd5280bb000 nid=0x48d1 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007fd5280b9800 nid=0x48d0 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007fd5280b6800 nid=0x48cf waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007fd5280b5000 nid=0x48ce runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007fd528082000 nid=0x48cd in Object.wait() [0x00007fd515c6d000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007fd52807d800 nid=0x48cc in Object.wait() [0x00007fd515d6e000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
"main" #1 prio=5 os_prio=0 tid=0x00007fd528008000 nid=0x48c2 runnable [0x00007fd52fd9f000]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
- locked <0x00000000ec086790> (a java.net.SocksSocketImpl)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:211)
at com.gravypod.Test.main(Test.java:48)
"VM Thread" os_prio=0 tid=0x00007fd528075800 nid=0x48ca runnable
"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007fd52801d800 nid=0x48c4 runnable
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007fd52801f000 nid=0x48c5 runnable
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007fd528021000 nid=0x48c6 runnable
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007fd528022800 nid=0x48c7 runnable
"VM Periodic Task Thread" os_prio=0 tid=0x00007fd5280c0800 nid=0x48d3 waiting on condition
JNI global references: 18
The Test.java:48 line is the Socket socket = new Socket
line. This is the result of the fully-working non-parallel code (just using .lines()).
Full thread dump OpenJDK 64-Bit Server VM (25.112-b15 mixed mode):
"Attach Listener" #9 daemon prio=9 os_prio=0 tid=0x00007f9048001000 nid=0x4982 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007f90800be800 nid=0x496f runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007f90800bb000 nid=0x496e waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007f90800b9800 nid=0x496d waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007f90800b6800 nid=0x496c waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f90800b5000 nid=0x496b runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f9080082000 nid=0x496a in Object.wait() [0x00007f907018d000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f908007d800 nid=0x4969 in Object.wait() [0x00007f907028e000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
"main" #1 prio=5 os_prio=0 tid=0x00007f9080008000 nid=0x4961 runnable [0x00007f90884c3000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
- locked <0x00000000ec08e890> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
- locked <0x00000000ec08e890> (a java.io.InputStreamReader)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at java.io.BufferedReader$1.hasNext(BufferedReader.java:571)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at com.gravypod.Test.main(Test.java:51)
"VM Thread" os_prio=0 tid=0x00007f9080075800 nid=0x4968 runnable
"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f908001d800 nid=0x4963 runnable
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f908001f000 nid=0x4964 runnable
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f9080021000 nid=0x4965 runnable
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f9080022800 nid=0x4966 runnable
"VM Periodic Task Thread" os_prio=0 tid=0x00007f90800c1000 nid=0x4970 waiting on condition
JNI global references: 319
The line Test.java:51 is the reader.lines().forEach
line.
Upvotes: 1
Views: 3797
Reputation: 298439
It seems, your application isn’t hanging, technically, but just waiting for a lot of input, before performing observable work. This is a combination of two implementation details. When you are starting a parallel stream operation, it will first try to split the workload until every CPU core has something to do, before actually starting to process elements. This combines with the Reader#lines() parallelizes badly due to nonconfigurable batch size issue.
Simply said, when a Stream has an unknown size, the implementation will try to buffer batches of sizes that are multiples of 1024
, growing on each split. This great answer shows, how the splitting will happen for a stream of an unknown size with multiple cores, showing that multiples of 1024
elements will get buffered in the process. This can take a very long time, before the consumer passed to forEach
is ever invoked.
Note that processing an infinite source via the non-short-circuiting forEach
is outside the scope of the Stream API anyway. Assuming a timely side effect is an assumption about the processing order of the Stream, but there is no guaranty about it.
This answer guides you to a work-around. You can use something like
try(Socket socket = new Socket(ADDRESS, PORT);
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream()))) {
socket.getOutputStream().write(QUERY);
Stream.generate(() -> {
try { return reader.readLine(); }
catch (IOException ex) { throw new UncheckedIOException(ex); }
}).parallel().forEach(System.out::println);
} catch(IOException|UncheckedIOException e) {
e.printStackTrace();
}
But, as said, this isn’t the intended use case of the Stream API…
Upvotes: 0
Reputation: 301
I imagine that parallel() or the forEach() on a parallel stream waits to read all the input before parallelising the task. Because the server never closes the connection, it will wait forever.
Your task is not really parallelizable. Data comes sequentially over the wire, so reading it in parallel cannot work.
Upvotes: 1