Reputation: 47
I think I'm a little lost in Threads because I don't understand why it's not working...
I have a program that has Server Clients and Nodes. The client sends grouped data (GroupedDataFrame) to the server asking for calculation of e.g. averages. The server distributes this task to its Nodes which calculate it and return their results to Server.
My problem is that NODE returns an empty DataFrame (without calculated results) when I use applyWithThreads (if I use normal "apply" (without threads) it is working good).
From what I realized, the command to create NodeResult does not wait for the previous function to finish i.e. applywithThreads, as a result, it sends empty DF (because if I set waiting for example 2 sec its working :/, or if I use this commented out loop, but I know it is bad practice)
I would like to solve it somehow! How to wait for applythread to finish (will form returnToServer)?
class ListenFromServer extends Thread {
public void run() {
while (true) {
try {
Object obj = sInput.readObject();
if (obj instanceof String) {
display((String) obj);
} else if (obj instanceof NodeRequestGDF) {
display("I have received NodeRequest from client ID: " + ((NodeRequestGDF) obj).getClientID());
// DataFrame returnToServer = (((NodeRequestGDF) obj).groupedDF.apply(((NodeRequestGDF) obj).getFunction()));
DataFrame returnToServer = (((NodeRequestGDF) obj).groupedDF.applywithThreads(((NodeRequestGDF) obj).getFunction()));
// while (returnToServer.size() != (((NodeRequestGDF) obj).getGroupedDF().getSize()));
NodeResultDF nodeResultDF = new NodeResultDF(returnToServer, ((NodeRequestGDF) obj).getClientID());
sendToServer(nodeResultDF);
display("I have returned result to Server to client ID: " + nodeResultDF.clientID);
} else {
display("I have received something i do not know what is this :(");
}
} catch (IOException e) {
display("Server has close the connection: " + e);
} catch (ClassNotFoundException e2) {
e2.printStackTrace();
}
}
}
}
Here is code of applyWithThreads:
public DataFrame applyWithThreads(Applyable fun) {
DataFrame ret = new DataFrame();
ArrayList<DataFrameThread> threadList = new ArrayList<>();
for (DataFrame df : this.data) {
DataFrameThread tmp = new DataFrameThread(df, fun, ret);
threadList.add(tmp);
}
ExecutorService threadPool = Executors.newFixedThreadPool(MAX_Threads);
for (DataFrameThread th : threadList) {
threadPool.execute(th);
}
threadPool.shutdown();
return ret;
}
and code of DataFrameThread:
import GroupFunctions.Applyable;
public class DataFrameThread extends Thread {
DataFrame ret;
DataFrame DF;
Applyable fun;
public DataFrameThread(DataFrame df, Applyable fun, DataFrame ret) {
this.DF = df;
this.fun = fun;
this.ret = ret;
}
@Override
public void run() {
DataFrame d = null;
try {
d = fun.apply(DF);
} catch (InconsistentTypeException e) {
e.printStackTrace();
}
synchronized (ret) {
ret.addAnotherDF(d);
}
}
}
``
Upvotes: 0
Views: 198
Reputation: 31279
There are quite a few problems with your code. I'll try to go over them.
The critical ones:
DataFrameThread
extends Thread
, but if you use a thread-pool, as you do with ExecutorService, the threads are not created by you anymore. Instead of saying extends Thread
, say implements Runnable
.shutdown()
on an ExecutorService does not wait for it to stop. You could call awaitTermination
after you call shutdown
, but that's not how you are supposed to use an ExecutorService.submit
instead of execute
. submit
returns Future<?>
, and you can call get()
on a Future
, which will block until its complete.Runnable
, implement Callable
. Callable
is for tasks that return values. Instead of calling ret.addAnotherDF
in your Runnable
, you return your DataFrame from your callable. Then, submit
will return a Future<DataFrame>
and when you call get
on it, it returns the DataFrame object when the thread is done.get
on the Future
when you have submitted all your tasks to the ExecutorService
, not after each task (if you do it after each task, you are no longer parallelizing the problem)Important:
applyWithThreads
call. The point of a thread-pool is to keep it around for as long as possible, as creating threads is an expensive operation. And that's why you shouldn't use shutdown
and awaitTermination
to find out when your task has completed; that's why you can do that with Future
objects.Upvotes: 1