Reputation: 3576
I'm having trouble to create one instance and make it shared between all threads, here's my code:
Here's main method:
public static void main(String... args) throws IOException, ClassNotFoundException {
MainApp mainApp = new MainApp();
mainApp.init();
mainApp.multiThread();
}
here's init():
private void init() {
HttpClient httpClient = HttpClientBuilder.create()
.setMaxConnTotal(TOTAL_CONNECTION)
.setMaxConnPerRoute(PER_ROUTE)
.build();
final HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(httpClient);
restTemplate = new RestTemplate(requestFactory);
}
TOTAL_CONNECTION
is 100, and PER_ROUTE
is 100
here's multithread():
private void multiThread() {
MaterializationChecker materializationChecker = new MaterializationChecker(restTemplate, new TotalInfo(0, 0));
materializationChecker.check();
}
Here's TotalInfo class:
public class TotalInfo {
@Getter private int total;
@Getter private int aboveThreshold;
public TotalInfo(int total, int aboveThreshold) {
this.total = total;
this.aboveThreshold = aboveThreshold;
}
protected synchronized void increaseAboveThreshold() {
aboveThreshold++;
}
protected synchronized void increaseTotal() {
total++;
}
}
Here's materializationChecker.check() method: (threadCount
is set to 10, taskCount
is set to 100)
public boolean check() {
try {
executor = Executors.newFixedThreadPool(threadCount);
completionService = new ExecutorCompletionService<>(executor);
submit(taskCount);
destroy();
System.out.println("Check finished -> OK !!!");
} catch (Exception e) {
System.out.println("exception when process - {}" + e);
}
return true;
}
private void submit(int taskCount) throws InterruptedException, ExecutionException {
for (int i = 0; i < taskCount; i++) {
completionService.submit(new MaterializationCallable(totalInfo));
}
int doneNum = 0;
MaterializationCallable materializationCallable;
Future<MaterializationCallable> future;
long averageLatencyOfAllAverages = 0L, minLatencyOfAllMins = Long.MAX_VALUE, maxLatencyOfAllMaxs = Long.MIN_VALUE;
while ((future = this.completionService.take()) != null) {
materializationCallable = future.get();
doneNum++;
System.out.println("Task " + doneNum + " done.");
averageLatencyOfAllAverages += materializationCallable.getLatencies().get(0);
minLatencyOfAllMins = Math.min(minLatencyOfAllMins, materializationCallable.getLatencies().get(1));
maxLatencyOfAllMaxs = Math.max(maxLatencyOfAllMaxs, materializationCallable.getLatencies().get(2));
if (doneNum >= taskCount) break;
}
System.out.println("----\naverageLatencyOfAllAverages = " + averageLatencyOfAllAverages/taskCount + " miiliseconds\nminLatencyOfAllMins = " + minLatencyOfAllMins
+ " ms\nmaxLatencyOfAllMaxs = " + maxLatencyOfAllMaxs + " ms");
System.out.println("total requests: " + totalInfo.getTotal() + ", total aboveThreshold: " + totalInfo.getAboveThreshold() + ", ratio (aboveThreshold/total): " + (totalInfo.getAboveThreshold()/totalInfo.getTotal()));
System.out.println("all tasks have been done.");
}
private void destroy() {
if (this.executor != null && !executor.isShutdown()) {
System.out.println("Shutdown and wait for all worker threads to be terminated.");
this.executor.shutdownNow();
while (!this.executor.isTerminated()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Occurred InterruptedException : {}" + e.getMessage());
}
System.out.println("Shutdown -> OK !!!");
}
}
}
Here's code for MaterializationCallable class:
public class MaterializationCallable implements Callable<MaterializationCallable> {
public static final int DURATION = 30;
private final TotalInfo totalInfo;
@Getter private List<Long> latencies;
public MaterializationCallable(TotalInfo totalInfo) {
this.latencies = new ArrayList<>();
this.totalInfo = totalInfo;
}
@Override
public MaterializationCallable call() throws Exception {
long totalLatency = 0;
long maxLatency = Long.MIN_VALUE;
long minLatency = Long.MAX_VALUE;
totalInfo.increaseTotal();
for (int i = 0; i < itemIds.size(); i++){
restTemplate.getForObject(endpoint, byte[].class);
if (i != 0) {
long oneLatency = receiveLatency + desiralizeLatency;
totalLatency += oneLatency;
if (minLatency > oneLatency) {
minLatency = oneLatency;
}
if (maxLatency < oneLatency) {
maxLatency = oneLatency;
}
long threshold = TimeUnit.MILLISECONDS.toMillis(DURATION);
if (oneLatency > threshold) {
totalInfo.increaseAboveThreshold();
System.out.println("[] This request went over threshold: " + threshold + " ms, and took " + oneLatency + " ms to finish, its endpoint = " + endpoint);
}
}
}
latencies.add(average);
latencies.add(minLatency);
latencies.add(maxLatency);
System.out.println("Thread " + Thread.currentThread().getId() + " is done.");
return this;
}
}
My question is:
at the end of materializationChecker.check()
method, totalInfo.getTotal()
is only 100
instead of 1000
, I have initialized a 10 thread pool, and submitted the task 100 times, how come totalInfo
object fields are not incremented 1000 times?
What went wrong? Please help me understand this.
Thanks a lot!
Upvotes: 0
Views: 52
Reputation: 21113
That's because you only submitted 100
tasks.
Your code is designed to increment the TotalInfo
values by one for each task submitted. The fact your executor has 10
threads is irrelevant with how the values of TotalInfo
are calculated.
The 10
threads simply allows the executor to perform 10
concurrent tasks and nothing more.
Upvotes: 2