Reputation: 883
// multiple server instances (replicas), coordinated using MsgCoordinationService
public class Server {
ConcurrentHashMap<TxID,Future<Msg>> local_registry = new ...
MsgCoordinationService coordination_service = new ..
...
// Socket instance to communicate with a client...
public void accept(Socket s) {
new Thread(new Worker(s)).start();
}
// propose msg to coordination service, register demand to respond to client in local registry
public Future<Msg> register(Msg m) {
FutureMsg f = new MsgFuture(); // Future handle w. reference to an empty Msg object [= response]
TxID uniqueID = coordination_service.propose(s); // transaction ID
local_registry.add(uniqueID, f);
return f;
}
// called by coordination service, guaranteeing a global order on msg deliveries
public synchronized void deliver(TxID id, Msg m) {
... process Msg object [request]
... if local_registry.contains(id), 'compile' response
(using the Msg object from FutureMsg f, f.get() - f.isDone() when a certain Msg flag has been set)
___ now:
... notify waiting 'Worker' threads to check whether their 'Future' object isDone()
}
private class Worker implements Runnable {
...
public void run() {
...
Future<Msg> f = Server.this.register(request); // obtained through Socket s
while(!f.isDone())
wait();
response = f.get();
...
}
}
}
I'm implementing a replicated service [multiple servers, clients communicate w. a single server instance, create/update/delete operations will be distributed by means of a coordination service that guarantees a global order on message deliveries].
Once a client establishes a new connection to a server instance, all communication is channeled through a dedicated Worker
instance [that processes read requests locally, and broadcasts C/U/D operations using Server.this.register(...)
].
register
itself basically records requests for future local processing/reply - and forwards a Msg
object to the coordination service.
The service redelivers the Msg
object via deliver
, ...and after processing the encapsulated task, the Worker
instance which originally received the client request shall be notified to hand over the corresponding response.
For certain reasons, my design seems to be broken... - w/o synchronized(this)
[in Worker#run()
], wait()
won't wait; with synchronized(this)
, a notifyAll()
in Server#deliver(...)
won't release the lock on the "blocked" Worker
instance.
Long story short: as it turns out, I need you help... either (a): to understand the very basics of wait/notify/notifyAll
or (b): to improve my design ...or (c): (a) and (b).
Upvotes: 0
Views: 101
Reputation: 62459
A thread calling wait/notify
requires the lock on the object on which these methods are called, otherwise you will get an exception. In the general form, assuming an arbitrary object:
final Object obj = new Object();
...
synchronized(obj) {
while(/* condition */) {
obj.wait();
}
}
The reason why your wait
is not being released by the notify
is that you are doing a this.wait()
inside a different object than the one where you are calling notify
, hence they are not paired. You need to use the same instance for both calls. You are doing this.wait()
inside an instance of Worker
and doing this.notifyAll()
inside an instance of Server
, so this
does not refer to the same object.
You should create a single synchronization object visible across classes and threads and synchronize on that one.
Upvotes: 2