restarcik
restarcik

Reputation: 47

How to wait for the function finish (Threads in Thread)

I think I'm a little lost in Threads because I don't understand why it's not working...

I have a program that has Server Clients and Nodes. The client sends grouped data (GroupedDataFrame) to the server asking for calculation of e.g. averages. The server distributes this task to its Nodes which calculate it and return their results to Server.

My problem is that NODE returns an empty DataFrame (without calculated results) when I use applyWithThreads (if I use normal "apply" (without threads) it is working good).

From what I realized, the command to create NodeResult does not wait for the previous function to finish i.e. applywithThreads, as a result, it sends empty DF (because if I set waiting for example 2 sec its working :/, or if I use this commented out loop, but I know it is bad practice)

I would like to solve it somehow! How to wait for applythread to finish (will form returnToServer)?

    class ListenFromServer extends Thread {
        public void run() {
            while (true) {
                try {
                    Object obj = sInput.readObject();
                    if (obj instanceof String) {
                        display((String) obj);
                    } else if (obj instanceof NodeRequestGDF) {
                        display("I have received NodeRequest from client ID: " + ((NodeRequestGDF) obj).getClientID());


                        // DataFrame returnToServer = (((NodeRequestGDF) obj).groupedDF.apply(((NodeRequestGDF) obj).getFunction()));  
                        DataFrame returnToServer = (((NodeRequestGDF) obj).groupedDF.applywithThreads(((NodeRequestGDF) obj).getFunction()));
                        // while (returnToServer.size() != (((NodeRequestGDF) obj).getGroupedDF().getSize()));


                        NodeResultDF nodeResultDF = new NodeResultDF(returnToServer, ((NodeRequestGDF) obj).getClientID());
                        sendToServer(nodeResultDF);
                        display("I have returned result to Server to client ID: " + nodeResultDF.clientID);

                    } else {
                        display("I have received something i do not know what is this :(");
                    }
                } catch (IOException e) {
                    display("Server has close the connection: " + e);
                } catch (ClassNotFoundException e2) {
                    e2.printStackTrace();
                }
            }
        }
    }

Here is code of applyWithThreads:

    public DataFrame applyWithThreads(Applyable fun) {
        DataFrame ret = new DataFrame();
        ArrayList<DataFrameThread> threadList = new ArrayList<>();
        for (DataFrame df : this.data) {
            DataFrameThread tmp = new DataFrameThread(df, fun, ret);
            threadList.add(tmp);
        }
        ExecutorService threadPool = Executors.newFixedThreadPool(MAX_Threads);
        for (DataFrameThread th : threadList) {
            threadPool.execute(th);
        }
        threadPool.shutdown();
        return ret;
    }

and code of DataFrameThread:

import GroupFunctions.Applyable;

public class DataFrameThread extends Thread {
    DataFrame ret;
    DataFrame DF;
    Applyable fun;

    public DataFrameThread(DataFrame df, Applyable fun, DataFrame ret) {
        this.DF = df;
        this.fun = fun;
        this.ret = ret;
    }

    @Override
    public void run() {
        DataFrame d = null;
        try {
            d = fun.apply(DF);
        } catch (InconsistentTypeException e) {
            e.printStackTrace();
        }
        synchronized (ret) {
            ret.addAnotherDF(d);
        }
    }
}

``

Upvotes: 0

Views: 198

Answers (1)

Erwin Bolwidt
Erwin Bolwidt

Reputation: 31279

There are quite a few problems with your code. I'll try to go over them.

The critical ones:

  • Your DataFrameThread extends Thread, but if you use a thread-pool, as you do with ExecutorService, the threads are not created by you anymore. Instead of saying extends Thread, say implements Runnable.
  • Calling shutdown() on an ExecutorService does not wait for it to stop. You could call awaitTermination after you call shutdown, but that's not how you are supposed to use an ExecutorService.
  • One improvement is to use submit instead of execute. submit returns Future<?>, and you can call get() on a Future, which will block until its complete.
  • Even better, instead of implementing Runnable, implement Callable. Callable is for tasks that return values. Instead of calling ret.addAnotherDF in your Runnable, you return your DataFrame from your callable. Then, submit will return a Future<DataFrame> and when you call get on it, it returns the DataFrame object when the thread is done.
  • Note: only call get on the Future when you have submitted all your tasks to the ExecutorService, not after each task (if you do it after each task, you are no longer parallelizing the problem)

Important:

  • don't create a new ExecutorService for each applyWithThreads call. The point of a thread-pool is to keep it around for as long as possible, as creating threads is an expensive operation. And that's why you shouldn't use shutdown and awaitTermination to find out when your task has completed; that's why you can do that with Future objects.

Upvotes: 1

Related Questions