Reputation: 24154
I am working on a project in which I will be having different Bundles. Let's take an example, Suppose I have 5 Bundles and each of those bundles will have a method name process
.
Below are the things, I am supposed to do-
process
method in parallel using multithreaded code and then write to the database. I am not sure what is the right way to do that? Should I have five thread? One thread for each bundle? But what will happen in that scenario, suppose if I have 50 bundles, then I will have 50 threads?The following attempt that I have done is most probably flawed and error handling is by no means complete. But somehow, I always get an error at this line-
pool.invokeAll
And the error is-
The method invokeAll(Collection<? extends Callable<T>>, long, TimeUnit) in the type ExecutorService is not applicable for the arguments (List<ModelFramework.ProcessBundleHolderEntry>, int, TimeUnit)
Below is my method which will call process method
of all the bundles in a multithreaded way.
public void processEvents(final Map<String, Object> eventData) {
ExecutorService pool = Executors.newFixedThreadPool(5);
List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>();
Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);
for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs);
entries.add(processBundleHolderEntry);
}
try {
// somehow I always get an error at invokeAll method. Is there anything wrong?
List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
for (int i = 0; i < futures.size(); i++) {
// This works since the list of future objects are in the
// same sequential order as the list of entries
Future<Object> future = futures.get(i);
ProcessBundleHolderEntry entry = entries.get(i);
if (!future.isDone()) {
// log error for this entry
}
}
} catch (InterruptedException e) {
// handle this exception!
}
}
Secondly, an implementation of Callable for your threads which I have added in the ModelFramework class
public class ProcessBundleHolderEntry implements Callable {
private BundleRegistration.BundlesHolderEntry entry;
private Map<String, String> outputs;
public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) {
this.entry = entry;
this.outputs = outputs;
}
public Object call() throws Exception {
final Map<String, String> response = entry.getPlugin().process(outputs);
// write to the database.
System.out.println(response);
return response;
}
}
Can anyone help me with the error that I am getting? And also Can anyone tell me whether there is any problem with the above approach or is there any better and efficient way of doing the same thing?
Any help will be appreciated on this.
Upvotes: 0
Views: 3366
Reputation: 116858
The following attempt that I have done is most probably flawed and error handling is by no means complete. But somehow, I always get an error at this line-
I think the problem is that ProcessBundleHolderEntry
should implement Callable<Object>
if you want the invokeAll(...)
to return a List<Future<Object>>
. I've just compiled your code and this fixes the problem.
Really it seems to me it should implement Callable<Map<String, String>>
. Then the call method should return the proper type:
public Map<String, String> call() throws Exception {
Then the invokeAll(...)
method would return the right List<Future<Map<String, String>>
.
A different (albeit a bit strange) idea would be to return the this
out of the call()
method. Have ProcessBundleHolderEntry implements Callable<ProcessBundleHolderEntry>
and record the response in the entry before returning this
from the call()
. Then you don't need to do a get(i)
on the entries to match it up. Then you have the entry, the outputs, and the response in one object.
Upvotes: 1
Reputation: 1489
never mind, i found the actual problem
public class ProcessBundleHolderEntry implements Callable {
should be defined as
public class ProcessBundleHolderEntry implements Callable<Object> {
in order to match the type Object
in following declaration
List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
the signature of invokeAll method is
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
Upvotes: 1