Reputation: 5387
I have a requirement where I need to read from a set of Blocking queues. The blocking queues are created by the Library I am using. My code has to read from the queues. I don't want to create a reader thread for each of these blocking queues. Rather I want to poll them for availability of data using a single thread (or probably using 2/3 threads at max). As some of the blocking queues might not have data for long time, while some of them may get bursts of data. Polling the queues with small timeout will work, but that is not efficient at all as it still needs to keep looping over all the queues even when some of them are without data for long time. Basically, I am looking for a select/epoll(used on sockets) kind of mechanism on blocking queues. Any clue is really appreciated.
Doing that in Go is real easy though. Below code simulates the same with channels and goroutines:
package main
import "fmt"
import "time"
import "math/rand"
func sendMessage(sc chan string) {
var i int
for {
i = rand.Intn(10)
for ; i >= 0 ; i-- {
sc <- fmt.Sprintf("Order number %d",rand.Intn(100))
}
i = 1000 + rand.Intn(32000);
time.Sleep(time.Duration(i) * time.Millisecond)
}
}
func sendNum(c chan int) {
var i int
for {
i = rand.Intn(16);
for ; i >= 0; i-- {
time.Sleep(20 * time.Millisecond)
c <- rand.Intn(65534)
}
i = 1000 + rand.Intn(24000);
time.Sleep(time.Duration(i) * time.Millisecond)
}
}
func main() {
msgchan := make(chan string, 32)
numchan := make(chan int, 32)
i := 0
for ; i < 8 ; i++ {
go sendNum(numchan)
go sendMessage(msgchan)
}
for {
select {
case msg := <- msgchan:
fmt.Printf("Worked on %s\n", msg)
case x := <- numchan:
fmt.Printf("I got %d \n", x)
}
}
}
Upvotes: 27
Views: 14227
Reputation: 6479
I remember when I was very new to Java, not knowing threads could share the memory of the process, I would have my threads communicate using (TCP/local) Sockets. Perhaps this can also work.
Upvotes: 1
Reputation: 101
An another choice is here for Java6+
A BlockingDeque implementation class:
import java.lang.ref.WeakReference;
import java.util.WeakHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
class GoChannelPool {
private final static GoChannelPool defaultInstance = newPool();
private final AtomicLong serialNumber = new AtomicLong();
private final WeakHashMap<Long, WeakReference<GoChannel>> channelWeakHashMap = new WeakHashMap<>();
private final LinkedBlockingDeque<GoChannelObject> totalQueue = new LinkedBlockingDeque<>();
public <T> GoChannel<T> newChannel() {
GoChannel<T> channel = new GoChannel<>();
channelWeakHashMap.put(channel.getId(), new WeakReference<GoChannel>(channel));
return channel;
}
public void select(GoSelectConsumer consumer) throws InterruptedException {
consumer.accept(getTotalQueue().take());
}
public int size() {
return getTotalQueue().size();
}
public int getChannelCount() {
return channelWeakHashMap.values().size();
}
private LinkedBlockingDeque<GoChannelObject> getTotalQueue() {
return totalQueue;
}
public static GoChannelPool getDefaultInstance() {
return defaultInstance;
}
public static GoChannelPool newPool() {
return new GoChannelPool();
}
private GoChannelPool() {}
private long getSerialNumber() {
return serialNumber.getAndIncrement();
}
private synchronized void syncTakeAndDispatchObject() throws InterruptedException {
select(new GoSelectConsumer() {
@Override
void accept(GoChannelObject t) {
WeakReference<GoChannel> goChannelWeakReference = channelWeakHashMap.get(t.channel_id);
GoChannel channel = goChannelWeakReference != null ? goChannelWeakReference.get() : null;
if (channel != null) {
channel.offerBuffer(t);
}
}
});
}
class GoChannel<E> {
// Instance
private final long id;
private final LinkedBlockingDeque<GoChannelObject<E>> buffer = new LinkedBlockingDeque<>();
public GoChannel() {
this(getSerialNumber());
}
private GoChannel(long id) {
this.id = id;
}
public long getId() {
return id;
}
public E take() throws InterruptedException {
GoChannelObject object;
while((object = pollBuffer()) == null) {
syncTakeAndDispatchObject();
}
return (E) object.data;
}
public void offer(E object) {
GoChannelObject<E> e = new GoChannelObject();
e.channel_id = getId();
e.data = object;
getTotalQueue().offer(e);
}
protected void offerBuffer(GoChannelObject<E> data) {
buffer.offer(data);
}
protected GoChannelObject<E> pollBuffer() {
return buffer.poll();
}
public int size() {
return buffer.size();
}
@Override
protected void finalize() throws Throwable {
super.finalize();
channelWeakHashMap.remove(getId());
}
}
class GoChannelObject<E> {
long channel_id;
E data;
boolean belongsTo(GoChannel channel) {
return channel != null && channel_id == channel.id;
}
}
abstract static class GoSelectConsumer{
abstract void accept(GoChannelObject t);
}
}
then we can use it in this way:
GoChannelPool pool = GoChannelPool.getDefaultInstance();
final GoChannelPool.GoChannel<Integer> numberCh = pool.newChannel();
final GoChannelPool.GoChannel<String> stringCh = pool.newChannel();
final GoChannelPool.GoChannel<String> otherCh = pool.newChannel();
ExecutorService executorService = Executors.newCachedThreadPool();
int times;
times = 2000;
final CountDownLatch countDownLatch = new CountDownLatch(times * 2);
final AtomicInteger numTimes = new AtomicInteger();
final AtomicInteger strTimes = new AtomicInteger();
final AtomicInteger defaultTimes = new AtomicInteger();
final int finalTimes = times;
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < finalTimes; i++) {
numberCh.offer(i);
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < finalTimes; i++) {
stringCh.offer("s"+i+"e");
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
int otherTimes = 3;
for (int i = 0; i < otherTimes; i++) {
otherCh.offer("a"+i);
}
for (int i = 0; i < times*2 + otherTimes; i++) {
pool.select(new GoChannelPool.GoSelectConsumer() {
@Override
void accept(GoChannelPool.GoChannelObject t) {
// The data order should be randomized.
System.out.println(t.data);
countDownLatch.countDown();
if (t.belongsTo(stringCh)) {
strTimes.incrementAndGet();
return;
}
else if (t.belongsTo(numberCh)) {
numTimes.incrementAndGet();
return;
}
defaultTimes.incrementAndGet();
}
});
}
countDownLatch.await(10, TimeUnit.SECONDS);
/**
The console output of data should be randomized.
numTimes.get() should be 2000
strTimes.get() should be 2000
defaultTimes.get() should be 3
*/
and beware that the select works only if the channels belong to the same GoChannelPool, or just use the default GoChannelPool(however the performance would be lower if too many channels share the same GoChannelPool)
Upvotes: 3
Reputation: 10268
I suggest you look into using the JCSP library. The equivalent of Go's select
is called Alternative. You would only need one consuming thread, which will not need to poll the incoming channels if it switches on them with Alternative
. Therefore this would be an efficient way to multiplex the source data.
It will help a lot if you are able to replace the BlockingQueues with JCSP channels. Channels behave essentially the same but provide a greater degree of flexibility regarding the fan-out or fan-in of sharing of channel ends, and in particular, the use of channels with Alternative
.
For an example of usage, here is a fair multiplexer. This example demonstrates a process that fairly multiplexes traffic from its array of input channels to its single output channel. No input channel will be starved, regardless of the eagerness of its competitors.
import org.jcsp.lang.*;
public class FairPlex implements CSProcess {
private final AltingChannelInput[] in;
private final ChannelOutput out;
public FairPlex (final AltingChannelInput[] in, final ChannelOutput out) {
this.in = in;
this.out = out;
}
public void run () {
final Alternative alt = new Alternative (in);
while (true) {
final int index = alt.fairSelect ();
out.write (in[index].read ());
}
}
}
Note that if priSelect
were used above, higher-indexed channels would be starved if lower-indexed channels were continually demanding service. Or instead of fairSelect
, select
could be used, but then no starvation analysis is possible. The select
mechanism should only be used when starvation is not an issue.
Freedom from Deadlock
As with Go, a Java program using channels must be designed not to deadlock. The implementation of low-level concurrency primitives in Java is very hard to get right and you need something dependable. Fortunately, Alternative
has been validated by formal analysis, along with the JCSP channels. This makes it a solid reliable choice.
Just to clear up on slight point of confusion, the current JCSP version is 1.1-rc5 in the Maven repos, not what the website says.
Upvotes: 17
Reputation: 13535
The only way is to replace standard queues with objects of a more functional class, which notifies consumer(s) when datum is inserted in an empty queue. This class still can implement the BlockingQueue interface, so the other side (producer) see no difference. The trick is that put
operation should also raise a flag and notify consumer. Consumer, after polling all threads, clears the flag and calls Object.wait()
.
Upvotes: 1