john
john

Reputation: 11679

How to make sure each thread works on different process object to avoid thread safety issue?

I have two process as shown below. Each of my process has run and shutdown method

Process processA = new ProcessA("processA", getProcessAProperties());
Process processB = new ProcessB("processB", getProcessBProperties());

Below is a simple example of how I am running my ProcessA with its own thread pool. There are three threads and each thread gets its own Process object to work on. Now I want to extend this in a more generic way so that it can work for both my processes as shown above.

public static void main(String[] args) {
  int numberOfThreads = 3;
  ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);

  final List<Process> processes = new ArrayList<>();
  for (int i = 0; i < numberOfThreads; i++) {
    // each thread works on different Process object
    Process processA = new ProcessA("processA", getProcessAProperties());
    processes.add(processA);
    executor.submit(processA);
  }

  Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
      for (Process process : processes) {
        process.shutdown();
      } 
      executor.shutdown();
      try {
        executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        e.printStackTrace;
      }
    }
  });
}

So I created a Process handler to solve the above problem in a generic way but there is a thread safety issue here:

public final class ProcessHandler {
  private final ExecutorService executorServiceProcess;
  private final Process process;
  private final Thread shutdownHook = new Thread() {
    @Override
    public void run() {
      process.shutdown();
      executorServiceProcess.shutdown();
    }
  };

  // in this constructor my code is reusing the same 
  // process instance for each thread in the pool 
  // which is a problem for my application, how to fix this?
  public ProcessHandler(Process process, int poolSize) {
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
    this.process = process;
    Runtime.getRuntime().addShutdownHook(shutdownHook);     
    for (int i = 0; i < poolSize; i++) {
      executorServiceProcess.submit(process);
    }
  }

  public void shutdown() {
    Runtime.getRuntime().removeShutdownHook(shutdownHook);
    shutdownHook.start();
    try {
      shutdownHook.join();
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
    }
  }
}

And this is the way my main method looks now:

public static void main(String[] args) {

    Process processA = new ProcessA("processA", getProcessAProperties());
    Process processB = new ProcessB("processB", getProcessBProperties());

    // processA will run with three threads in its own thread pool
    ProcessHandler processHandlerA = new ProcessHandler (processA, 3);
    // processB will run with two threads in its own thread pool
    ProcessHandler processHandlerB = new ProcessHandler (processB, 2);

    // now I can call shutdown on them
    processHandlerA.shutdown();
    processHandlerB.shutdown();
}

As you can see in my above ProcessHandler constructor, I am reusing the same process instance for each thread in the pool which is not what I want to do. I want each thread to work on a different instance of Process object just like what I have in my first main method for ProcessA, each thread is working on different Process object.

What is the best way to solve this design problem? I am also open in redesigning my ProcessHandler to solve this problem in a right way.

Upvotes: 0

Views: 128

Answers (1)

Xephi
Xephi

Reputation: 431

Maybe try something like this :

// Replace Process process by a list of Process
List<Process> processes = new ArrayList<Process>();

private final Thread shutdownHook = new Thread() {
    @Override
    public void run() {
      for (Process process : processes)
        process.shutdown();
      executorServiceProcess.shutdown();
    }
};

public ProcessHandler(Process process, int poolSize) {
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
    Runtime.getRuntime().addShutdownHook(shutdownHook);     
    for (int i = 0; i < poolSize; i++) {
      // Get a deep copy of the process
      Process p = process.clone();
      processes.add(p);
      executorServiceProcess.submit(p);
    }
}

Another way, without cloning method, is to abstract the Process and add two constructor based on these, Don't forget to adapt your code based on these elements

public abstract class AProcess extends Process {

    private String name;
    private Properties properties;
    public AProcess(String name, Properties properties)
    {
        this.setName(name);
        this.setProperties(properties);
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Properties getProperties() {
        return properties;
    }

    public void setProperties(Properties properties) {
        this.properties = properties;
    }
}

You now have to implements your ProcessA and ProcessB :

public class ProcessA extends AProcess {

    public ProcessA(String name, Properties properties)
    {
        super(name, properties);
    }
}

Now create your ProcessHandler :

public final class ProcessHandler {
  private final ExecutorService executorServiceProcess;
  private final List<AProcess> processes = new ArrayList<AProcess>();
  private final Thread shutdownHook = new Thread() {
    @Override
    public void run() {
      for (AProcess process : processes)
          process.shutdown();
      executorServiceProcess.shutdown();
    }
  };

  public ProcessHandler(ProcessA process, int poolSize) {
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
    Runtime.getRuntime().addShutdownHook(shutdownHook);     
    for (int i = 0; i < poolSize; i++) {
      ProcessA p = new ProcessA(process.getName(), process.getProperties());
      processes.add(p);
      executorServiceProcess.submit(p);
    }
  }

  public ProcessHandler(ProcessB process, int poolSize) {
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
    Runtime.getRuntime().addShutdownHook(shutdownHook);     
    for (int i = 0; i < poolSize; i++) {
      ProcessB p = new ProcessB(process.getName(), process.getProperties());
      processes.add(p);
      executorServiceProcess.submit(p);
    }
  }

  public void shutdown() {
    Runtime.getRuntime().removeShutdownHook(shutdownHook);
    shutdownHook.start();
    try {
      shutdownHook.join();
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
    }
  }
}

And use like that ProcessHandler processHandlerA = new ProcessHandler(processA, 3);

Upvotes: 1

Related Questions