Reputation: 9296
I have a Situation where I wrote a simple Producer Consumer model for reading in chunks of data from Bluetooth then every 10k bytes I write that to file. I used a standard P-C Model using a Vector as my message holder. So how do I change this so that multiple Thread consumers can read the same messages, I think the term would be Multicaster? I am actually using this on an Android phone so JMS is probably not an option.
static final int MAXQUEUE = 50000;
private Vector<byte[]> messages = new Vector<byte[]>();
/**
* Put the message in the queue for the Consumer Thread
*/
private synchronized void putMessage(byte[] send) throws InterruptedException {
while ( messages.size() == MAXQUEUE )
wait();
messages.addElement( send );
notify();
}
/**
* This method is called by the consumer to see if any messages in the queue
*/
public synchronized byte[] getMessage()throws InterruptedException {
notify();
while ( messages.size() == 0 && !Thread.interrupted()) {
wait(1);
}
byte[] message = messages.firstElement();
messages.removeElement( message );
return message;
}
I am referencing code from an Oreilly book Message Parser section
Upvotes: 1
Views: 1289
Reputation: 9296
This is what I came up with as an example when digging through some code and modifiying some existing examples.
package test.messaging;
import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
public class TestProducerConsumers {
static Broker broker;
public TestProducerConsumers(int maxSize) {
broker = new Broker(maxSize);
Producer p = new Producer();
Consumer c1 = new Consumer("One");
broker.consumers.add(c1);
c1.start();
Consumer c2 = new Consumer("Two");
broker.consumers.add(c2);
c2.start();
p.start();
}
// Test Producer, use your own message producer on a thread to call up
// broker.insert() possibly passing it the message instead.
class Producer extends Thread {
@Override
public void run() {
while (true) {
try {
broker.insert();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer extends Thread {
String myName;
LinkedBlockingQueue<String> queue;
Consumer(String m) {
this.myName = m;
queue = new LinkedBlockingQueue<String>();
}
@Override
public void run() {
while(!Thread.interrupted()) {
try {
while (queue.size() == 0 && !Thread.interrupted()) {
;
}
while (queue.peek() == null && !Thread.interrupted()) {
;
}
System.out.println("" + myName + " Consumer: " + queue.poll());
} catch (Exception e) { }
}
}
}
class Broker {
public ArrayList<Consumer> consumers = new ArrayList<Consumer>();
int n;
int maxSize;
public Broker(int maxSize) {
n = 0;
this.maxSize = maxSize;
}
synchronized void insert() throws InterruptedException {
// only here for testing don't want it to runaway and
//memory leak, only testing first 100 samples.
if (n == maxSize)
wait();
System.out.println("Producer: " + n++);
for (Consumer c : consumers) {
c.queue.add("Message " + n);
}
}
}
public static void main(String[] args) {
TestProducerConsumers pc = new TestProducerConsumers(100);
}
}
Upvotes: 0
Reputation: 12610
You should definitely use a queue instead of the Vector
!
Give every thread its own queue and, when a new message is received, add()
the new message to every thread's queue. For flexibility, a listener pattern may be useful, too.
Edit:
Ok, I feel I should add an example, too:
(Classical observer pattern)
This is the interface, all consumers must implement:
public interface MessageListener {
public void newMessage( byte[] message );
}
A producer might look like this:
public class Producer {
Collection<MessageListener> listeners = new ArrayList<MessageListener>();
// Allow interested parties to register for new messages
public void addListener( MessageListener listener ) {
this.listeners.add( listener );
}
public void removeListener( Object listener ) {
this.listeners.remove( listener );
}
protected void produceMessages() {
byte[] msg = new byte[10];
// Create message and put into msg
// Tell all registered listeners about the new message:
for ( MessageListener l : this.listeners ) {
l.newMessage( msg );
}
}
}
And a consumer class could be (using a blocking queue which does all that wait()
ing and notify()
ing for us):
public class Consumer implements MessageListener {
BlockingQueue< byte[] > queue = new LinkedBlockingQueue< byte[] >();
// This implements the MessageListener interface:
@Override
public void newMessage( byte[] message ) {
try {
queue.put( message );
} catch (InterruptedException e) {
// won't happen.
}
}
// Execute in another thread:
protected void handleMessages() throws InterruptedException {
while ( true ) {
byte[] newMessage = queue.take();
// handle the new message.
}
}
}
Upvotes: 0
Reputation: 9427
Pub-sub mechanism is definitely the way to achieve what you want. I am not sure why developing for Android will restrict you from using JMS, which is as simple a spec as it gets. Check out this thread on SO.
Upvotes: 1