Nipun Talukdar
Nipun Talukdar

Reputation: 5387

Equivalent of Go channel in Java

I have a requirement where I need to read from a set of Blocking queues. The blocking queues are created by the Library I am using. My code has to read from the queues. I don't want to create a reader thread for each of these blocking queues. Rather I want to poll them for availability of data using a single thread (or probably using 2/3 threads at max). As some of the blocking queues might not have data for long time, while some of them may get bursts of data. Polling the queues with small timeout will work, but that is not efficient at all as it still needs to keep looping over all the queues even when some of them are without data for long time. Basically, I am looking for a select/epoll(used on sockets) kind of mechanism on blocking queues. Any clue is really appreciated.

Doing that in Go is real easy though. Below code simulates the same with channels and goroutines:

package main

import "fmt"
import "time"
import "math/rand"

func sendMessage(sc chan string) {
    var i int

    for {
        i =  rand.Intn(10)
        for ; i >= 0 ; i-- {
            sc <- fmt.Sprintf("Order number %d",rand.Intn(100))
        }
        i = 1000 + rand.Intn(32000);
        time.Sleep(time.Duration(i) * time.Millisecond)
    }
}

func sendNum(c chan int) {
    var i int 
    for  {
        i = rand.Intn(16);
        for ; i >=  0; i-- {
            time.Sleep(20 * time.Millisecond)
            c <- rand.Intn(65534)
        }
        i = 1000 + rand.Intn(24000);
        time.Sleep(time.Duration(i) * time.Millisecond)
    }
}

func main() {
    msgchan := make(chan string, 32)
    numchan := make(chan int, 32)
    i := 0
    for ; i < 8 ; i++ {
        go sendNum(numchan)
        go sendMessage(msgchan)
    }
    for {
        select {
        case msg := <- msgchan:
            fmt.Printf("Worked on  %s\n", msg)
        case x := <- numchan:
            fmt.Printf("I got %d \n", x)
        }
    }
}

Upvotes: 27

Views: 14227

Answers (4)

Gubatron
Gubatron

Reputation: 6479

I remember when I was very new to Java, not knowing threads could share the memory of the process, I would have my threads communicate using (TCP/local) Sockets. Perhaps this can also work.

Upvotes: 1

John Lee
John Lee

Reputation: 101

An another choice is here for Java6+

A BlockingDeque implementation class:

import java.lang.ref.WeakReference;
import java.util.WeakHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;

class GoChannelPool {

    private final static GoChannelPool defaultInstance = newPool();

    private final AtomicLong serialNumber = new AtomicLong();
    private final WeakHashMap<Long, WeakReference<GoChannel>> channelWeakHashMap = new WeakHashMap<>();
    private final LinkedBlockingDeque<GoChannelObject> totalQueue = new LinkedBlockingDeque<>();

    public <T> GoChannel<T> newChannel()  {
        GoChannel<T> channel = new GoChannel<>();
        channelWeakHashMap.put(channel.getId(), new WeakReference<GoChannel>(channel));
        return channel;
    }

    public void select(GoSelectConsumer consumer) throws InterruptedException {
        consumer.accept(getTotalQueue().take());
    }

    public int size() {
        return getTotalQueue().size();
    }

    public int getChannelCount() {
        return channelWeakHashMap.values().size();
    }

    private LinkedBlockingDeque<GoChannelObject> getTotalQueue() {
        return totalQueue;
    }

    public static GoChannelPool getDefaultInstance() {
        return defaultInstance;
    }

    public static GoChannelPool newPool()  {
        return new GoChannelPool();
    }

    private GoChannelPool() {}

    private long getSerialNumber() {
        return serialNumber.getAndIncrement();
    }

    private synchronized void syncTakeAndDispatchObject() throws InterruptedException {
        select(new GoSelectConsumer() {
            @Override
            void accept(GoChannelObject t) {

                WeakReference<GoChannel> goChannelWeakReference = channelWeakHashMap.get(t.channel_id);
                GoChannel channel = goChannelWeakReference != null ? goChannelWeakReference.get() : null;
                if (channel != null) {
                    channel.offerBuffer(t);
                }
            }
        });
    }

    class GoChannel<E> {
        // Instance
        private final long id;
        private final LinkedBlockingDeque<GoChannelObject<E>> buffer = new LinkedBlockingDeque<>();

        public GoChannel() {
            this(getSerialNumber());
        }

        private GoChannel(long id) {
            this.id = id;
        }

        public long getId() {
            return id;
        }

        public E take() throws InterruptedException {
            GoChannelObject object;
            while((object = pollBuffer()) == null) {
                syncTakeAndDispatchObject();
            }

            return (E) object.data;
        }

        public void offer(E object) {
            GoChannelObject<E> e = new GoChannelObject();
            e.channel_id = getId();
            e.data = object;

            getTotalQueue().offer(e);
        }

        protected void offerBuffer(GoChannelObject<E> data) {
            buffer.offer(data);
        }

        protected GoChannelObject<E> pollBuffer() {
            return buffer.poll();
        }

        public int size() {
            return buffer.size();
        }

        @Override
        protected void finalize() throws Throwable {
            super.finalize();

            channelWeakHashMap.remove(getId());
        }
    }

    class GoChannelObject<E> {
        long channel_id;
        E data;

        boolean belongsTo(GoChannel channel) {
            return channel != null && channel_id == channel.id;
        }
    }

    abstract static class GoSelectConsumer{
        abstract void accept(GoChannelObject t);
    }
}

then we can use it in this way:

GoChannelPool pool = GoChannelPool.getDefaultInstance();
final GoChannelPool.GoChannel<Integer> numberCh = pool.newChannel();
final GoChannelPool.GoChannel<String> stringCh = pool.newChannel();
final GoChannelPool.GoChannel<String> otherCh = pool.newChannel();

ExecutorService executorService = Executors.newCachedThreadPool();
int times;
times = 2000;
final CountDownLatch countDownLatch = new CountDownLatch(times * 2);

final AtomicInteger numTimes = new AtomicInteger();
final AtomicInteger strTimes = new AtomicInteger();
final AtomicInteger defaultTimes = new AtomicInteger();

final int finalTimes = times;
executorService.submit(new Runnable() {
    @Override
    public void run() {
        for (int i = 0; i < finalTimes; i++) {
            numberCh.offer(i);

            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
});
executorService.submit(new Runnable() {
    @Override
    public void run() {
        for (int i = 0; i < finalTimes; i++) {
            stringCh.offer("s"+i+"e");

            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
});

int otherTimes = 3;
for (int i = 0; i < otherTimes; i++) {
    otherCh.offer("a"+i);
}

for (int i = 0; i < times*2 + otherTimes; i++) {
    pool.select(new GoChannelPool.GoSelectConsumer() {
        @Override
        void accept(GoChannelPool.GoChannelObject t) {
            // The data order should be randomized.
            System.out.println(t.data);

            countDownLatch.countDown();

            if (t.belongsTo(stringCh)) {
                strTimes.incrementAndGet();
                return;
            }
            else if (t.belongsTo(numberCh)) {
                numTimes.incrementAndGet();
                return;
            }

            defaultTimes.incrementAndGet();
        }
    });
}
countDownLatch.await(10, TimeUnit.SECONDS);

/**
The console output of data should be randomized.
numTimes.get() should be 2000
strTimes.get() should be 2000
defaultTimes.get() should be 3
*/

and beware that the select works only if the channels belong to the same GoChannelPool, or just use the default GoChannelPool(however the performance would be lower if too many channels share the same GoChannelPool)

Upvotes: 3

Rick-777
Rick-777

Reputation: 10268

I suggest you look into using the JCSP library. The equivalent of Go's select is called Alternative. You would only need one consuming thread, which will not need to poll the incoming channels if it switches on them with Alternative. Therefore this would be an efficient way to multiplex the source data.

It will help a lot if you are able to replace the BlockingQueues with JCSP channels. Channels behave essentially the same but provide a greater degree of flexibility regarding the fan-out or fan-in of sharing of channel ends, and in particular, the use of channels with Alternative.

For an example of usage, here is a fair multiplexer. This example demonstrates a process that fairly multiplexes traffic from its array of input channels to its single output channel. No input channel will be starved, regardless of the eagerness of its competitors.

import org.jcsp.lang.*;

public class FairPlex implements CSProcess {

   private final AltingChannelInput[] in;
   private final ChannelOutput out;

   public FairPlex (final AltingChannelInput[] in, final ChannelOutput out) {
     this.in = in;
     this.out = out;
   }

   public void run () {

     final Alternative alt = new Alternative (in);

     while (true) {
       final int index = alt.fairSelect ();
       out.write (in[index].read ());
     }
   }
 }

Note that if priSelect were used above, higher-indexed channels would be starved if lower-indexed channels were continually demanding service. Or instead of fairSelect, select could be used, but then no starvation analysis is possible. The select mechanism should only be used when starvation is not an issue.

Freedom from Deadlock

As with Go, a Java program using channels must be designed not to deadlock. The implementation of low-level concurrency primitives in Java is very hard to get right and you need something dependable. Fortunately, Alternative has been validated by formal analysis, along with the JCSP channels. This makes it a solid reliable choice.

Just to clear up on slight point of confusion, the current JCSP version is 1.1-rc5 in the Maven repos, not what the website says.

Upvotes: 17

Alexei Kaigorodov
Alexei Kaigorodov

Reputation: 13535

The only way is to replace standard queues with objects of a more functional class, which notifies consumer(s) when datum is inserted in an empty queue. This class still can implement the BlockingQueue interface, so the other side (producer) see no difference. The trick is that put operation should also raise a flag and notify consumer. Consumer, after polling all threads, clears the flag and calls Object.wait().

Upvotes: 1

Related Questions