Stephen Ostermiller
Stephen Ostermiller

Reputation: 25535

Use Java concurrency to run a multi-threaded task until there is enough output

I have a task in Java that is currently single threaded and may or may not produce output. I need to run this task until I have 100 pieces of output from it. Here is the single threaded (greatly simplified) example version of this:

import java.security.SecureRandom;

public class Test {

    private static SecureRandom rand = new SecureRandom();

    public static String task() {
        return rand.nextDouble() > 0.5 ? "output" : null;
    }

    public static void main(String[] args) {
        int outputCount = 0;
        while (outputCount < 100) {
            String output = task();
            if (output != null) {
                outputCount++;
                System.out.println(output);
            }
        }
    }
}

I would like to run this task multi-threaded in four (4) threads. How do I implement it?

I've looked into using the Executor interfaces but they appear to be geared at running a task exactly n times, not until needed.

Some additional notes:

Upvotes: 1

Views: 1372

Answers (5)

Stephen Ostermiller
Stephen Ostermiller

Reputation: 25535

The simplest approach:

  • Use AtomicInteger to make the count thread safe (as suggested by Alexander Pogrebnyak's answer)
  • Use an ExecutorService which can invokeAll a list of Callable and wait until they all finish.
  • Put SecureRand rand and String task() as member elements of the Callable
  • Move the loop inside the Callable's call() method, have each thread complete tasks until there are no more tasks to complete.

Here is the implementation:

import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Test {
    private static final int TASKS = 100;
    private static final int THREADS = 4;
    public static void main(String[] args) throws InterruptedException {        
        final AtomicInteger outputCount = new AtomicInteger(0);
        ExecutorService threadPool = Executors.newFixedThreadPool(THREADS);
        Collection<Callable<Object>> tasks = new ArrayList<>(THREADS);
        for (int i = 0; i < THREADS; i++) {
            tasks.add(new Callable<Object>() {
                private SecureRandom rand = new SecureRandom();
                private String task() {
                    return rand.nextDouble() > 0.5 ? Thread.currentThread().getName() : null;
                }
                @Override public Object call() {
                    for (int i; (i = outputCount.get()) < TASKS;) {
                        String output = task();
                        if (output != null) {
                            if ((i = outputCount.incrementAndGet()) < TASKS) {
                                System.out.println(output + ": " + i);
                            }
                        }
                    } return null;
                }
            });
        }
        threadPool.invokeAll(tasks);
        threadPool.shutdownNow();
        System.out.println("done");
    }
}

Upvotes: 1

Alexander Pogrebnyak
Alexander Pogrebnyak

Reputation: 45586

I think in your case you can use just an AtomicInteger that is shared between your tasks.

public class MyTask
  implements Runnable
{
  private AtomicInteger counter;

  public MyTask ( AtomicInteger counter )
  {
    this.counter = counter;
  }

  public void run ()
  {
    while ( true )
    {
      String output = task();
      if ( output != null )
      {
        int count = counter.getAndIncrement( );

        System.out.println(output);
        if ( count >= 100 )
        {
          break;
        }
      }
    }
  }

  public static String task() {
    return rand.nextDouble() > 0.5 ? "output" : null;
  }

  public static void main (
      String[] args
    ) throws InterruptedException
  {
    AtomicInteger counter = new AtomicInteger( );

    ExecutorService pool = Executors.newFixedThreadPool(4);

    for (int i = 0; i < 4; ++i)
    {
        pool.execute( new MyTask( counter ) );
    }

    // Simplified shutdown, do not use this in production
    pool.shutdown( );
    pool.awaitTermination(1, TimeUnit.HOURS);
  }
}

Upvotes: 2

Guy Chauliac
Guy Chauliac

Reputation: 650

  • I suggest using a CountDownLatch for limiting the number of output objects to 100
  • For delivering the ouput in a thread safe way you could just use a ArrayList which you have decorated with Collections.synchronizedList to make it thread safe
  • For accessing the not thread safe operation you could make it thread safe by synchronizing on the expensive object which produces the output object

    import java.security.SecureRandom;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class CountDownTask implements Runnable{
        private final CountDownLatch latch;
        private final List<String> output;
        private final SecureRandom random;
    
        public CountDownTask( CountDownLatch latch, List<String> output, SecureRandom random ) {
            super();
            this.latch = latch;
            this.output = output;
            this.random = random;
        }
    
        public void run() {
            while(latch.getCount() > 0){
                if(getNextValueInAThreadSafeWay() > 0.5){
                    output.add("output");
                    latch.countDown();
                }
            }
        }
    
        private double getNextValueInAThreadSafeWay(){
            synchronized(random){
                return random.nextDouble();
            }
        }
    
        public static void main(String[] args) throws InterruptedException{
            CountDownLatch theLatch = new CountDownLatch( 100 );
            List<String> output = Collections.synchronizedList( new ArrayList<String>() );
            SecureRandom random = new SecureRandom();
            ExecutorService service = Executors.newCachedThreadPool();
            for(int i=0;i<4;i++){
               service.execute( new CountDownTask( theLatch, output, random ) );
            }
            theLatch.await( 1, TimeUnit.MINUTES );
            service.shutdown();
            System.out.println(output.size());
        }
    }
    

Upvotes: 1

mkobit
mkobit

Reputation: 47269

You could still use an Executor service but use some longer-living Runnables to run the execution and pull the reusable objects from some shared map of objects. Here is a quick example with comments that I think could work for you. EDIT: I updated my previous example to meet the requirement of at least 100 pieces of output, but possibly more:

import java.security.SecureRandom;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Example {
    private static AtomicInteger outputCount = new AtomicInteger(0);
    private static SecureRandom rand = new SecureRandom();

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Object> expensiveObjects = new LinkedBlockingQueue<>();
        // Random objects put in
        expensiveObjects.put(new Object());
        expensiveObjects.put(new Object());
        expensiveObjects.put(new Object());
        expensiveObjects.put(new Object());
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        for (int i = 0 ; i < 4; i++) {
            executorService.execute(new MyRunnable(expensiveObjects));
        }
        // Arbitrary wait in this example.
        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);
        System.out.println("Final: " + outputCount.get());
    }

    public static String task() {
        return rand.nextDouble() > 0.5 ? "output" : null;
    }

    /**
     * Runnable that keeps executing while outputCount is less than 100.
     */
    private static class MyRunnable implements Runnable {
        private final BlockingQueue<Object> expensiveObjects;

        public MyRunnable(final BlockingQueue<Object> expensiveObjects) {
            this.expensiveObjects = expensiveObjects;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Object expensiveObject = expensiveObjects.take();
                    // Use the expensive object that you talked about and put it back for reuse when done. Not needed in this example but leaving it for now
                    String output = task();
                    expensiveObjects.put(expensiveObject);
                    if (output != null) {
                        int counter = outputCount.getAndIncrement();
                        System.out.println(counter);
                        if (counter >= 100) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    System.out.println("Error!");
                }
            }
        }
    }
}

Upvotes: 1

yair
yair

Reputation: 9245

Well, here's my shot at it. Worked for me:

public class MakeItConcurrent {

    private static final ExecutorService threadPool = Executors.newFixedThreadPool(4);
    private static final AtomicInteger outputCount = new AtomicInteger();
    private static final ThreadLocal<SecureRandom> threadToStringBuilder = new ThreadLocal<SecureRandom>();

    public static String task() {
        SecureRandom rand = threadToStringBuilder.get();
        if (rand == null) {
            threadToStringBuilder.set(new SecureRandom());
            rand = threadToStringBuilder.get();
        }
        return rand.nextDouble() > 0.5 ? "output" : null;
    }

    public static void doManyTasks() {
        int currOutputCount;
        while ( ( currOutputCount = outputCount.get() ) < 100) {
            String output = task();
            if (output != null) {
                // outputCount.compareAndSet(currOutputCount, currOutputCount + 1); use this if you want exactly 100 outputs
                outputCount.set(currOutputCount + 1);
                System.out.println(output);
            }
        }
        threadPool.shutdownNow();
    }

    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = new Runnable() {            
            @Override public void run() {
                doManyTasks();
            }
        };
        threadPool.submit(runnable);
        threadPool.submit(runnable);
        threadPool.submit(runnable);
        threadPool.submit(runnable);
        while ( ! threadPool.isShutdown() ) {
            Thread.sleep(100);
        }
        System.out.println(outputCount);
    }
}

Upvotes: -1

Related Questions