Reputation: 1727
I have a list of sessions that I have to call a webservice to set some property on each session.
I am trying to call webservice using async process and use completablefuture for it so that when it is all done, I can save them all in db.
How can I do this? So far, my code is as follows, it doesn't work.
sessions.stream()
.forEach(s -> CompletableFuture.runAsync(() -> webServiceCall(s), executor));
sessionService.saveAll(sessions);
EDIT:
I came up with this solution, not sure if this is the correct way of doing it.
List<CompletableFuture<Void>> futures = sessions.stream()
.map(s -> CompletableFuture.runAsync(() -> webServiceCall(s), executor))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
sessionService.saveAll(sessions);
I am using join to make sure it waits for response to return before saving sessions
Upvotes: 0
Views: 2581
Reputation: 1199
In short - all you need something like this -
CompletableFuture.supplyAsync(this::supplySomething, ex).thenAccept(this::consumer);
You need a method that will call in a executor (threadpool). In my case my pool size is 100. Next you need to call your supplier as many times as you want.
Each call to 'supplier' will create one task. I'm creating 10000 tasks. Each of them will run in parallel and each of them, upon completion, will call my 'consumer'.
Your supplier should return some sort of object which holds response from webservice. This object will then become the parameter of your 'consumer' method.
You might want to kill the pool after (or in middle) everything is done.
See an example below -
package com.sanjeev.java8.thread;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Caller {
public static ExecutorService ex = Executors.newFixedThreadPool(100);
public static void main(String[] args) throws InterruptedException {
Caller caller = new Caller();
caller.start();
ex.shutdown();
ex.awaitTermination(10, TimeUnit.MINUTES);
}
private void start() {
for (int i = 0; i < 10000; i++) {
CompletableFuture.supplyAsync(this::supplySomething, ex).thenAccept(this::consumer);
}
}
private int supplySomething() {
try {
URL url = new URL("http://www.mywebservice.com");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setDoInput(true);
connection.connect();
try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) {
wr.write("supply-some-data".getBytes());
}
Reader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
for (int c; (c = in.read()) >= 0;) {
System.out.print((char) c);
}
in.close();
// return the response code. I'm return 'int', you should return some sort of object.
return 200;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public void consumer(Integer i) {
// This parameter should be of type 'your object' that supplier returned.
// I got the response; add it in the list or whatever....
}
}
Another example that might suits your need better -
public class Caller2 {
public static ExecutorService ex = Executors.newFixedThreadPool(2);
private static Iterator<String> addresses = Stream.of("www.google.com", "www.yahoo.com", "www.abc.com").collect(Collectors.toList()).iterator();
private static ArrayList<String> results = new ArrayList<>();
public static void main(String[] args) throws InterruptedException {
Caller2 caller = new Caller2();
caller.start();
ex.shutdown();
ex.awaitTermination(1, TimeUnit.HOURS);
System.out.println(results);
}
private void start() {
while (addresses.hasNext()) {
CompletableFuture.supplyAsync(this::supplyURL, ex).thenAccept(this::consumer);
}
}
private String supplyURL() {
String url = addresses.next();
// call this URL and return response;
return "Success";
}
public void consumer(String result) {
results.add(result);
}
Upvotes: 2