Andrew
Andrew

Reputation: 1002

Should I keep long running threads bare or use Executors?

I am working on a streaming java application that is using several long running worker threads. The application receives data, processes it, and then sends it along toward a third party using their SDK. There is an Engine class that recieves data and submits it to Workers. The worker threads will live for as long as the application runs, which could be months if not years.

I have included sample code that represents this key part of this question.

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BarEngine implements Engine
{
   static Logger log = LoggerFactory.getLogger(BarEngine.class);

   private static final int WORKER_COUNT = 5;
   private BlockingQueue<Map<String, Object>> queue;

   private FooWorker[] workers = new FooWorker[WORKER_COUNT];

   public BarEngine()
   {
      for (int i = 0; i < WORKER_COUNT; i++)
      {
         workers[i] = new FooWorker(i, queue);
         workers[i].start();
      }
   }

   // From Engine Interface
   @Override
   public void sendEvent(final Map<String, Object> data)
   {
      try
      {
         queue.put(data);
      }
      catch (InterruptedException e)
      {
         log.error("Unexpected Exception", e);
      }
   }

   // From Engine Interface
   @Override
   public void shutDown()
   {
      // Shuts down engine

   }

   public static class FooWorker extends Thread
   {
      static Logger log = LoggerFactory.getLogger(FooWorker.class);

      private volatile boolean run = true;
      private int id;
      private BlockingQueue<Map<String, Object>> queue;
      private Client client;

      public FooWorker(int id, BlockingQueue<Map<String, Object>> queue)
      {
         this.id = id;
         this.queue = queue;
         client = Client.build(id);
      }

      @Override
      public void run()
      {
         setName("FooWorker-" + id);
         while (run)
         {
            try
            {
               Map<String, Object> data = queue.poll(5, TimeUnit.SECONDS);
               if (null != data)
               {
                  sendEvent(data);
               }
            }
            catch (Throwable e)
            {
               log.error("Unexpected Exception", e);
            }
         }
      }

      private void sendEvent(Map<String, Object> data)
      {
         try
         {
            client.submit(data);
         }
         catch (Throwable e)
         {
            log.error("Unexpected Exception", e);
         }
      }

      // dummy client classs
      public static class Client
      {
         public void submit(Map<String, Object> data)
         {
            // submits data
         }

         public static Client build(int id)
         {
            // Builds client
            return new Client();
         }
      }
   }
}

I have been doing a bit of research, and I have not found a satisfactory answer.

My question is: Should I keep these long running Threads bare like this? If not, what should I replace it with (Like ExecutorService or something else)?

Upvotes: 5

Views: 4751

Answers (2)

Majlanky
Majlanky

Reputation: 304

Yes, what you posted is exactly something for ExecutorService. Some advices:

  • There are two main interfaces Executor, ExecutorService.
  • Executors should be ended if you dont need them via shutdown/shutdownNow methods on ExecutorService interface. If not, you will face memleaks.
  • Create your ExecutorService via Executors, for example:

    Executors.newFixedThreadPool(5);

If you use Executor service, you can push data directly to ExecutorService via Runnable because ExecutorService doing queuing it self and split tasks into its workers...

Upvotes: 1

Sudheera
Sudheera

Reputation: 1947

Answering your question, if you have threads which has the same lifetime of the application, in my opinion it doesn't matter if you are using a Thread or Executer service (which is again using Threads underneath) as long as you manage the thread's life cycle properly.

From design point of view your application falls in to software category what we called a "middleware". Generally a middleware application should be efficient as well as scalable, both which are essential qualities of such server yet you have ignored both. Your application's threads run busy-wait loops per thread keeping the CPU busy at all time. Even when the incoming load is very low this keeps happening. Which is a not good quality to have for such application.

Alternatively, I'm proposing you to use a ThreadPool implementation such as ThreadPoolExecutor which already have solved what you are trying to accomplish here. ThreadPoolExecutor leverages the functionality of a BlockingQueue if all initially fired up threads are busy at the moment. also it can stop threads if the load is low and fire up again if wanted. I have coded the structure of the design I'm proposing. Take a look at the following code. I assumed that Client is not thread-safe so I'm constructing a Client per thread. If your real client implementation is thread-safe you can use one client across all threads.

import java.util.Map;
import java.util.concurrent.*;

public class BarEngine implements Engine {

    private static final int WORKER_COUNT = 5;
    private ExecutorService threadPool;

    public BarEngine() {
        this.threadPool = new ThreadPoolExecutor(1, WORKER_COUNT, 10, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100));
    }

    // From Engine Interface
    @Override
    public void sendEvent(final Map<String, Object> data) {
        threadPool.submit(new FooWorker(data));
    }

    // From Engine Interface
    @Override
    public void shutDown() {
        this.threadPool.shutdown();
        // Shuts down engine

    }

    public static class FooWorker implements Runnable {

        private final Client client;
        private final Map<String, Object> data;

        public FooWorker(Map<String, Object> data) {
            client = Client.build(Thread.currentThread().getId());
            this.data = data;
        }

        @Override
        public void run() {
            try {
                if (null != data) {
                    sendEvent(data);
                }
            } catch (Throwable e) {
                //todo log
            }
        }

        private void sendEvent(Map<String, Object> data) {
            try {
                client.submit(data);
            } catch (Throwable e) {
                //todo log
            }
        }

        // dummy client classs
        public static class Client {

            public void submit(Map<String, Object> data) {
                // submits data
            }

            public static Client build(long id) {
                // Builds client
                return new Client();
            }
        }
    }
}

Upvotes: 4

Related Questions