Jason S
Jason S

Reputation: 189626

java: concurrent iteration over an immutable Iterable

I have an immutable Iterable<X> with a large number of elements. (it happens to be a List<> but never mind that.)

What I would like to do is start a few parallel / asynchronous tasks to iterate over the Iterable<> with the same iterator, and I'm wondering what interface I should use.

Here's a sample implementation with the to-be-determined interface QuasiIteratorInterface:

public void process(Iterable<X> iterable)
{
   QuasiIteratorInterface<X> qit = ParallelIteratorWrapper.iterate(iterable);
   for (int i = 0; i < MAX_PARALLEL_COUNT; ++i)
   {
      SomeWorkerClass worker = new SomeWorkerClass(qit);
      worker.start();
   }
}

class ParallelIteratorWrapper<T> implements QuasiIteratorInterface<T>
{
   final private Iterator<T> iterator;
   final private Object lock = new Object();
   private ParallelIteratorWrapper(Iterator<T> iterator) { 
      this.iterator = iterator;
   }
   static public <T> ParallelIteratorWrapper<T> iterate(Iterable<T> iterable)
   {
      return new ParallelIteratorWrapper(iterable.iterator());
   }
   private T getNextItem()
   {
      synchronized(lock)
      {
         if (this.iterator.hasNext())
            return this.iterator.next();
         else
            return null;
      }
   }
   /* QuasiIteratorInterface methods here */
}

Here's my problem:

Any suggestions?

Upvotes: 4

Views: 1578

Answers (2)

Jed Wesley-Smith
Jed Wesley-Smith

Reputation: 4706

Create your own Producer interface with the poll() method or equivalent (Guava's Supplier for instance). The implementation options are many but if you have an immutable random access list then you can simply maintain a thread-safe monotonic counter (AtomicInteger for instance) and call list.get(int) eg:

class ListSupplier<T> implements Supplier<T> {
  private final AtomicInteger next = new AtomicInteger();
  private final List<T> elements; // ctor injected

  …
  public <T> get() {
    // real impl more complicated due to bounds checks
    // and what to do when exhausted
    return elements.get(next.getAndIncrement());
  }
}

That is thread-safe, but you'd probably want to either return an Option style thing or null when exhausted.

Upvotes: 1

Peter Knego
Peter Knego

Reputation: 80330

Have one dispatcher thread that iterates over Iterable and dispatches elements to multiple worker threads that perform the work on the elements. You can use ThreadPoolExecutor to automate this.

Upvotes: 0

Related Questions