Reputation: 2532
I've a class which implements Observer & Runnable as follows (I'm aware this example may be clumsy):
public class Triage implements Observer,Runnable{
Observable obsrvbl;
private BlockingQueue<String> messages;
volatile static boolean interrupted=false;
double updated;
Triage(Observable obsrvbl, BlockingQueue messages){
this.obsrvbl=obsrvbl;
this.messages = messages;
obsrvbl.addObserver(this);
}
public void update(Observable o, Object arg){
updated += ((Double)arg).doubleValue();
System.out.println("updated");
}
public void run(){
String msg;
while(!interrupted){
msg=messages.take();
if(msg!=null){
//do something with message
}
}
}
}
The Queue that is being peeked at is populated at the same time that Observable calls notifyObservers(). When there is nothing on the Queue, update() is successfully called on the Observer but if the Queue has messages on it to be processed, update() never gets called. Is this expected behaviour ?
I've seen this but it seems to be a different issue.
And here's the Observable - somewhat contrived:
public class Producer extends Observable implements Runnable {
volatile static boolean interrupted=false;
private BlockingQueue<String> quotes;
Producer(BlockingQueue quotes){
this.quotes=quotes;
}
public void run(){
String msg;
while(!interrupted){
msg=quotes.take();
if(msg!=null){
setChanged();
notifyObservers(Double.valueOf(3.0));
}
}
}
}
Upvotes: 3
Views: 2731
Reputation: 2532
Figured out what was wrong with this - oversight on my part. As @slim suggested in a comment the two queues were the same so Triage's run() was consuming the message before Producer's run() and so not being notified. Anyway fwiw - here's a complete working example:
public class Triage implements Observer,Runnable{
Observable obsrvbl;
private BlockingQueue<String> messages;
volatile static boolean interrupted=false;
Integer updated = 0;
private static Random rand = new Random(47);
Triage(Observable obsrvbl, BlockingQueue messages){
this.obsrvbl=obsrvbl;
this.messages = messages;
obsrvbl.addObserver(this);
}
public void update(Observable o, Object arg){
updated += ((Integer)arg);
System.out.println("Updated: " + updated);
}
public void run(){
String msg;
while(!interrupted){
try{
msg=messages.take();
System.out.println("Run: " + msg);
}catch(InterruptedException ie){}
}
}
public static void main(String[] args){
BlockingQueue<String> q1 = new LinkedBlockingQueue<String>();
BlockingQueue<String> q2 = new LinkedBlockingQueue<String>();
Producer p = new Producer(q1);
new Thread(p).start();
new Thread(new Triage(p,q2)).start();
for(int i=0;i<20;i++){
int next = rand.nextInt(10)*500;
System.out.println("Populating: " + next);
q1.add((Integer.valueOf(next)).toString());
q2.add((Integer.valueOf(next)).toString());
}
}
}
class Producer extends Observable implements Runnable {
volatile static boolean interrupted=false;
private BlockingQueue<String> quotes;
Producer(BlockingQueue quotes){
this.quotes=quotes;
}
public void run(){
String msg;
while(!interrupted){
try{
msg=quotes.take();
if(msg!=null){
System.out.println("Notifying: " + msg);
setChanged();
notifyObservers(Integer.valueOf(msg));
}
}catch(InterruptedException ie){}
}
}
}
Upvotes: 1