Reputation: 1002
I am working on a streaming java application that is using several long running worker threads. The application receives data, processes it, and then sends it along toward a third party using their SDK. There is an Engine class that recieves data and submits it to Workers. The worker threads will live for as long as the application runs, which could be months if not years.
I have included sample code that represents this key part of this question.
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BarEngine implements Engine
{
static Logger log = LoggerFactory.getLogger(BarEngine.class);
private static final int WORKER_COUNT = 5;
private BlockingQueue<Map<String, Object>> queue;
private FooWorker[] workers = new FooWorker[WORKER_COUNT];
public BarEngine()
{
for (int i = 0; i < WORKER_COUNT; i++)
{
workers[i] = new FooWorker(i, queue);
workers[i].start();
}
}
// From Engine Interface
@Override
public void sendEvent(final Map<String, Object> data)
{
try
{
queue.put(data);
}
catch (InterruptedException e)
{
log.error("Unexpected Exception", e);
}
}
// From Engine Interface
@Override
public void shutDown()
{
// Shuts down engine
}
public static class FooWorker extends Thread
{
static Logger log = LoggerFactory.getLogger(FooWorker.class);
private volatile boolean run = true;
private int id;
private BlockingQueue<Map<String, Object>> queue;
private Client client;
public FooWorker(int id, BlockingQueue<Map<String, Object>> queue)
{
this.id = id;
this.queue = queue;
client = Client.build(id);
}
@Override
public void run()
{
setName("FooWorker-" + id);
while (run)
{
try
{
Map<String, Object> data = queue.poll(5, TimeUnit.SECONDS);
if (null != data)
{
sendEvent(data);
}
}
catch (Throwable e)
{
log.error("Unexpected Exception", e);
}
}
}
private void sendEvent(Map<String, Object> data)
{
try
{
client.submit(data);
}
catch (Throwable e)
{
log.error("Unexpected Exception", e);
}
}
// dummy client classs
public static class Client
{
public void submit(Map<String, Object> data)
{
// submits data
}
public static Client build(int id)
{
// Builds client
return new Client();
}
}
}
}
I have been doing a bit of research, and I have not found a satisfactory answer.
Executor
. Does not cover the application long-life threads per se.Executor
but does not answer if one SHOULD manage long live threads with Executor
My question is: Should I keep these long running Threads
bare like this? If not, what should I replace it with (Like ExecutorService
or something else)?
Upvotes: 5
Views: 4751
Reputation: 304
Yes, what you posted is exactly something for ExecutorService. Some advices:
Create your ExecutorService via Executors, for example:
Executors.newFixedThreadPool(5);
If you use Executor service, you can push data directly to ExecutorService via Runnable because ExecutorService doing queuing it self and split tasks into its workers...
Upvotes: 1
Reputation: 1947
Answering your question, if you have threads which has the same lifetime of the application, in my opinion it doesn't matter if you are using a Thread or Executer service (which is again using Threads underneath) as long as you manage the thread's life cycle properly.
From design point of view your application falls in to software category what we called a "middleware". Generally a middleware application should be efficient as well as scalable, both which are essential qualities of such server yet you have ignored both. Your application's threads run busy-wait loops per thread keeping the CPU busy at all time. Even when the incoming load is very low this keeps happening. Which is a not good quality to have for such application.
Alternatively, I'm proposing you to use a ThreadPool
implementation such as ThreadPoolExecutor
which already have solved what you are trying to accomplish here. ThreadPoolExecutor
leverages the functionality of a BlockingQueue
if all initially fired up threads are busy at the moment. also it can stop threads if the load is low and fire up again if wanted. I have coded the structure of the design I'm proposing. Take a look at the following code. I assumed that Client
is not thread-safe so I'm constructing a Client
per thread. If your real client implementation is thread-safe you can use one client across all threads.
import java.util.Map;
import java.util.concurrent.*;
public class BarEngine implements Engine {
private static final int WORKER_COUNT = 5;
private ExecutorService threadPool;
public BarEngine() {
this.threadPool = new ThreadPoolExecutor(1, WORKER_COUNT, 10, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100));
}
// From Engine Interface
@Override
public void sendEvent(final Map<String, Object> data) {
threadPool.submit(new FooWorker(data));
}
// From Engine Interface
@Override
public void shutDown() {
this.threadPool.shutdown();
// Shuts down engine
}
public static class FooWorker implements Runnable {
private final Client client;
private final Map<String, Object> data;
public FooWorker(Map<String, Object> data) {
client = Client.build(Thread.currentThread().getId());
this.data = data;
}
@Override
public void run() {
try {
if (null != data) {
sendEvent(data);
}
} catch (Throwable e) {
//todo log
}
}
private void sendEvent(Map<String, Object> data) {
try {
client.submit(data);
} catch (Throwable e) {
//todo log
}
}
// dummy client classs
public static class Client {
public void submit(Map<String, Object> data) {
// submits data
}
public static Client build(long id) {
// Builds client
return new Client();
}
}
}
}
Upvotes: 4