Reputation: 73
Storm 0.9.6
Worker JVM args:
Config.TOPOLOGY_WORKER_CHILDOPTS: "-Xmx4096m -Xss256k -XX:MaxDirectMemorySize=4096m -XX:PermSize=512m "
Reading data by StoppableKafkaSpout. Then I try to parse each input line.
Counter function:
AtomicLong ct_in = new AtomicLong(0L);
public void countIn(){
long cin = ct_in.getAndIncrement();
if(cin % 100000 == 0){
System.err.println("[x] reading." + Thread.currentThread() + " count:" + cin);
}
}
1st test code: do nothing just count:
public void parse(String line) {
countIn();
if(true){ //
return;
}
// nothing to do...
}
1st gc info and output:
$ jstat -gcutil 15510 1000 500
S0 S1 E O M CCS YGC YGCT FGC FGCT GCT
0.00 0.49 64.44 6.26 97.61 95.35 217 1.144 2 0.118 1.262
0.00 5.78 46.35 6.26 97.61 95.35 219 1.152 2 0.118 1.270
0.00 7.82 38.42 6.26 97.61 95.35 221 1.160 2 0.118 1.278
0.00 6.45 16.13 6.26 97.61 95.35 223 1.168 2 0.118 1.286
4.80 0.00 82.64 6.26 97.61 95.35 224 1.171 2 0.118 1.290
1.39 0.00 59.54 6.26 97.61 95.35 226 1.180 2 0.118 1.299
1.02 0.00 39.77 6.26 97.61 95.35 228 1.188 2 0.118 1.307
0.00 6.22 91.23 6.26 97.61 95.35 229 1.193 2 0.118 1.311
0.00 1.17 81.15 6.26 97.64 95.38 231 1.202 2 0.118 1.320
0.00 3.23 59.05 6.26 97.64 95.38 233 1.210 2 0.118 1.329
0.00 2.57 50.69 6.26 97.64 95.38 235 1.219 2 0.118 1.337
0.00 0.54 13.92 6.26 97.64 95.38 237 1.227 2 0.118 1.345
0.31 0.00 86.84 6.26 97.64 95.38 238 1.230 2 0.118 1.348
0.00 0.17 99.73 6.26 97.64 95.38 239 1.234 2 0.118 1.352
0.38 0.00 87.92 6.26 97.64 95.38 240 1.239 2 0.118 1.357
1.34 0.00 37.79 6.26 97.65 95.38 242 1.246 2 0.118 1.364
0.63 0.00 0.00 6.26 97.65 95.38 244 1.258 2 0.118 1.377
0.00 0.35 50.55 6.26 97.65 95.38 245 1.263 2 0.118 1.381
0.00 1.36 0.00 6.26 97.66 95.38 247 1.269 2 0.118 1.387
6.27 0.00 64.02 6.26 97.66 95.38 248 1.273 2 0.118 1.391
5.71 0.00 61.99 6.26 97.66 95.38 250 1.281 2 0.118 1.399
3.32 0.00 69.13 6.26 97.67 95.38 252 1.288 2 0.118 1.407
6.17 0.00 36.57 6.26 97.69 95.38 254 1.296 2 0.118 1.415
5.74 0.00 5.70 6.26 97.71 95.38 256 1.305 2 0.118 1.423
0.00 11.27 84.73 6.26 97.71 95.38 257 1.310 2 0.118 1.428
0.00 9.28 91.32 6.26 97.71 95.38 259 1.318 2 0.118 1.436
0.00 3.21 78.01 6.26 97.71 95.38 261 1.326 2 0.118 1.445
0.00 19.92 71.34 6.26 97.71 95.38 263 1.336 2 0.118 1.455
0.00 3.44 40.88 6.26 97.72 95.38 265 1.345 2 0.118 1.463
0.00 10.53 34.00 6.26 97.72 95.38 267 1.352 2 0.118 1.470
0.00 1.12 6.22 6.26 97.73 95.38 269 1.360 2 0.118 1.478
0.50 0.00 91.07 6.26 97.73 95.38 270 1.364 2 0.118 1.482
3.10 0.00 97.11 6.26 97.73 95.38 272 1.372 2 0.118 1.490
11.48 0.00 89.29 6.26 97.73 95.38 274 1.383 2 0.118 1.502
12.19 0.00 78.67 6.26 97.73 95.38 276 1.393 2 0.118 1.511
6.01 0.00 53.11 6.26 97.73 95.38 278 1.400 2 0.118 1.518
0.81 0.00 4.09 6.26 97.73 95.38 280 1.409 2 0.118 1.527
0.00 8.06 76.45 6.26 97.73 95.38 281 1.414 2 0.118 1.533
output:
.........
2016-03-13T14:28:14.926+0800 STDIO [ERROR] [x] reading.Thread[Thread-5-parser,5,main] count:40900000
2016-03-13T14:28:15.465+0800 STDIO [ERROR] [x] reading.Thread[Thread-5-parser,5,main] count:41000000
2016-03-13T14:28:16.019+0800 STDIO [ERROR] [x] reading.Thread[Thread-5-parser,5,main] count:41100000
2016-03-13T14:28:16.501+0800 STDIO [ERROR] [x] reading.Thread[Thread-5-parser,5,main] count:41200000
2016-03-13T14:28:17.003+0800 STDIO [ERROR] [x] reading.Thread[Thread-5-parser,5,main] count:41300000
2016-03-13T14:28:17.542+0800 STDIO [ERROR] [x] reading.Thread[Thread-5-parser,5,main] count:41400000
You can see it worked normal, and processed more than 40000000 lines.
2nd code: I just added a split() operation:
public void parse(String line) {
countIn();
if(true){ //
}
String[] fields = line.trim().split("\t"); // the only operation
if(true){
return;
}
}
2nd gc info:
................
0.00 100.00 0.00 81.59 98.00 96.71 277 9.968 38 1.224 11.193
0.00 100.00 44.39 78.65 98.00 96.71 279 10.067 38 1.224 11.291
100.00 0.00 0.00 81.76 98.01 96.71 282 10.170 38 1.224 11.394
100.00 0.00 55.90 85.37 98.01 96.71 284 10.264 38 1.224 11.488
0.00 100.00 0.00 90.49 98.06 96.71 287 10.404 39 1.228 11.632
0.00 100.00 55.95 93.95 98.06 96.71 289 10.503 40 1.261 11.764
0.00 100.00 96.74 83.56 98.06 96.71 292 10.605 40 1.261 11.866
96.69 0.00 49.89 87.89 98.06 96.71 294 10.729 40 1.261 11.990
0.00 100.00 22.86 91.05 98.06 96.71 297 10.830 41 1.264 12.094
0.00 100.00 69.56 94.34 98.08 96.71 299 10.908 41 1.264 12.172
81.73 93.15 100.00 85.74 98.08 96.71 302 10.983 42 1.311 12.294
96.22 0.00 79.59 88.45 98.08 96.71 304 11.086 42 1.311 12.397
0.00 100.00 51.37 92.46 98.08 96.71 307 11.185 44 1.315 12.500
84.06 0.00 3.87 88.05 98.08 96.71 310 11.291 44 1.360 12.650
100.00 0.00 82.37 89.45 98.08 96.71 312 11.332 44 1.360 12.692
0.00 100.00 38.60 94.44 98.10 96.71 315 11.460 45 1.363 12.823
0.00 100.00 72.97 88.06 98.10 96.71 317 11.533 46 1.389 12.922
100.00 0.00 3.99 92.62 98.10 96.71 320 11.645 47 1.393 13.038
95.22 0.00 81.26 90.02 98.10 96.71 322 11.712 48 1.452 13.164
0.00 100.00 41.85 94.77 98.10 96.71 325 11.832 49 1.455 13.287
95.03 0.00 0.00 93.60 98.10 96.71 328 11.946 50 1.509 13.455
Full GC accured again and again.
output:
2016-03-13T15:00:44.119+0800 STDIO [ERROR] [x] reading.Thread[Thread-5-parser,5,main] count:24900000
2016-03-13T15:00:52.117+0800 STDIO [ERROR] [x] reading.Thread[Thread-5-parser,5,main] count:25000000
And then the worker thread blocked because of Full GC...
JVM gc log:
2016-03-13T15:01:15.380+0800: 238.646: Total time for which application threads were stopped: 0.1699727 seconds, Stopping threads took: 0.0000422 seconds
2016-03-13T15:01:15.380+0800: 238.646: [CMS-concurrent-mark-start]
2016-03-13T15:01:15.483+0800: 238.748: [Full GC (Allocation Failure) 238.748: [CMS2016-03-13T15:01:16.055+0800: 239.320: [CMS-concurrent-mark: 0.673/0.674 secs] [Times: user=1.65 sys=0.03, real=0.68 secs]
jmap log:
$ jmap -histo xxxx | head
num #instances #bytes class name
----------------------------------------------
1: 5971981 3189464624 [B
2: 5945371 142688904 backtype.storm.messaging.TaskMessage
3: 359549 38503840 [Ljava.lang.Object;
4: 1808 31499496 [I
5: 65226 7823656 [C
6: 301860 7244640 java.util.ArrayList
7: 294871 7076904 java.util.concurrent.LinkedBlockingQueue$Node
So, the problem is, why the only one line code
line.trim().split();
causes so big diffrence? And what is the right method for processing the lines I read from KafkaSpout if I cannot use String.split (and some other APIs such as StringBuilder. They made the same problem) ...
Upvotes: 2
Views: 829
Reputation: 146
Most likely, it's the amount of parallelism bolts that is causing it or the storm settings in storm.yaml and the topology submit configuration. try to set the number of CPU consuming bolts to the amount of cores.
And try to increase the minimum size of the heap: -Xms4G
Upvotes: 0
Reputation: 533530
Your survivor spaces are filling up. When your survivor space isn't big enough to take all the data on a minor collection, it triggers a full collection. Try to increase the size of you survivor spaces so that they never fill up.
before you survivor space is getting to 8% full
0.00 7.82 38.42 6.26 97.61 95.35 221 1.160 2 0.118 1.278
after your survivor space is getting to 100% full.
0.00 100.00 0.00 81.59 98.00 96.71 277 9.968 38 1.224 11.193
0.00 100.00 44.39 78.65 98.00 96.71 279 10.067 38 1.224 11.291
100.00 0.00 0.00 81.76 98.01 96.71 282 10.170 38 1.224 11.394
100.00 0.00 55.90 85.37 98.01 96.71 284 10.264 38 1.224 11.488
While your operation seems harmless enough, if your survivor space is not big enough it can still blow it out. i.e. you are creating a lot of medium lived objects which are always a problem.
I would try increasing the maximum heap size which appears to be too long for your new code, and increase your young gen.
-Xmx8g -Xmn4g
Upvotes: 4