PentiumPro200
PentiumPro200

Reputation: 631

Java Threading - Wait for data to be returned, but don't block other threads

I have 7 threads running in an ExecutorPool that process data and occasionally need data from a listener instance running on another thread. The listener sends a request over a socket to a server and a while later, when the result is returned, the listener will return the data to the worker thread that called it. I want to block the worker thread until the requested data is returned, but I don't want to block the listener from making other requests from the other worker threads. How do I do that?

Upvotes: 0

Views: 268

Answers (2)

bowmore
bowmore

Reputation: 11308

If one thread hands off work to another thread, and then subsequently simply waits for the result, you don't need another thread to do the work. You may need a class that does the work, but which is called on the same thread. And if the same instance is used by multiple threads some synchronization may be needed. But the bottom line is this :

You don't need the listener thread. Replace it with a component that handles a request, and call it synchronously.

Edit

Given your own answer, your problem is a bit clearer. As @JimN suggests you probably want to hand out a Future to the worker thread, and make it a CompletableFuture the Listener keeps in a Map keyed by request id until the response returns.

Sample code :

public class WorkUnitProcessor implements Runnable {

    // ...

    @Override
    public void run() {
        while(true) {
            WorkUnit work = master.getNextWorkUnit();
            if(work == null) return;
            doWork(work);
        }
    }

    public void doWork(WorkUnit work) {

        //Do some work...

        try {
            DataRequest dataRequest = createRequest(work);

            Future<Response> future = server.getData(dataRequest);
            Response response = future.get();                       // this call blocks until the Response is available.

            //finish doing work

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            // handle e.getCause()
        }

    }

    // ...
}

public class Server implements DataSourceDrivenCallback {

    private final DataSource dataSource;

    private Map<Integer, CompletableFuture<Response>> openRequests = new ConcurrentHashMap<>();

    public Server(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    public void incomingDataCallback(int requestId, ChunkOfData requestedData) {
        CompletableFuture<Response> responseHolder = openRequests.remove(requestId);  // get the responseHolder
        if (responseHolder != null) {
            responseHolder.complete(toResponse(requestedData));                     // make the response available.
        }
    }

    public Future<Response> getData(DataRequest datarequest) {
        int requestId = dataSource.submitRequest(serializeAndTranslateRequest(datarequest));
        CompletableFuture<Response> future = new CompletableFuture<>();
        openRequests.put(requestId, future);
        return future;
    }

    // ...
}

Upvotes: 2

PentiumPro200
PentiumPro200

Reputation: 631

I think this might work. What I was looking for is described here:

https://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html

It's the ability to make a thread sleep until it is notified by the thread that it is waiting on. Seems easy to use.

public class DataProcessor {
    private List<WorkUnit> work;
    private Server server;

    public DataProcessor(List<WorkUnit> work, int numprocessors) {
        this.work = work;
        setupProcessors(numprocessors);
        Server server = new Server();
    }

    private void setupProcessors(int numprocessors) {
        for(int i = 0; i < numprocessors; i++) {
            WorkUnitProcessor worker = new WorkUnitProcessor(this, server);
            worker.start();
        }
    }

    public synchronized WorkUnit getNextWorkUnit() {
        if(work.isEmpty()) return null;
        return work.remove(0);
    }
}

public class WorkUnitProcessor(Server server) {
    private DataProcessor master;
    private Server server;

    public WorkUnitProcessor(DataProcessor master) {
        this.master = master;
    }

    @Override
    public void run() {
        while(true) {
            WorkUnit work = master.getNextWorkUnit();
            if(work == null) return;
            doWork(work);
        }
    }

    public void doWork(WorkUnit work) {
        //Do some work...
        server.getData(datarequest, this);
        while(!datarequest.filled) {
            try {
                wait();
            } catch (InterruptedException e) {}
        }
        //finish doing work
    }   
}

public class Server implements DataSourceDrivenCallback {
    private DataSource ds;
    private Map<Integer, OpenRequest> openrequests;


    public Server() {
        //setup socket and establish communication with server through DataSource object
        DataSource ds = new DataSource(<ID>, <Socket>);
    }

    public synchronized void getData(DataRequest datarequest, WorkUnitProcessor workerthread) {
        int requestid = ds.submitRequest(serializeAndTranslateRequest(datarequest));
        openrequests.add(new OpenRequest(workerthread, datarequest));
    }

    @Override
    public void incomingDataCallback(int requestid, ChunkOfData requesteddata) {
        OpenRequest request = openrequests.get(requestid);
        request.datarequest.storeData(requesteddata);
        request.workerthread.notify();
    }
}

public class OpenRequest {
    private WorkUnitProcessor workerthread;
    private DataRequest datarequest;
    //other details about request
}

Upvotes: 0

Related Questions