Reputation: 1253
I'm writing a message processing application (email) that I want to have an outgoing queue. The way I've designed this is having a singleton queue class, ThreadedQueueSender, backed by an Executor Service and a BlockingQueue. Additionally, a thread pool of javax.mail.Transport objects is used to obtain and release connections to the outgoing SMTP server.
This class exposes a method, add(MimeMessage)
, that adds messages to the work queue (BlockingQueue
).
At instantiation of the class, the ExecutorService
is initialized to a ThreadPoolExecutor
with a fixed number of threads, lets say 5. Each thread's run()
method is in infinite loop that only exits when it detects interrupt (when ExecutorService.shutdownNow()
is called).
This run method uses BlockingQueue.poll()
to take messsages from the work queue until no more are available without blocking, then requests a Transport
object from the connection pool, opens the connection, sends all the messages its retrieved, closes the connection and returns the Transport
object.
This works, but I feel I am not taking full advantage of the ExecutorService by having a fixed number of threads that run for the life of the application. Also, I am managing the work queue myself instead of letting the concurrency frameworks handle it. How would others implement this functionality? Is it better to wrap each incoming message in a Runnable, then execute the sending logic?
Thank you, any comments are appreciated.
Ryan
Upvotes: 3
Views: 5121
Reputation: 193
You should create tasks for every piece of work that should be done by your executor service.
For example you could create a callable "MailSendingTask" that holds the MimeMessage and wraps the mail sending. Queue these MailSendingTasks by submitting them to your executor. Now your Executor decides how many threads will be created (Config it by setting lower and upper thread pool bounds)
You only need to create 2 or 3 classes/interfaces
You could even go futher by creating an extra service that manages the mail socket connections which could be used by the MailSenderTask.
If you want to add "cancelation" you should look at the classes Future and FutureTask
Upvotes: 1
Reputation: 76241
Wrapping up the messages in a Runnable
would force you to either make the work queue unbounded or deal with what happens when the queue is full. ThreadPoolExecutor
gives you a few policies for dealing with this situation - See ThreadPoolExecutor javadoc for details. - Give up, run it yourself / discard something
Another thing you can do is allow the thread pool to create threads beyond its core size, how threads are spawned and when they are reaped is described by the first 4 arguments to the ThreadPoolExecutor
constructor. How well this works in reality will depend on the resource bottleneck.
Also, what's the advantage of BlockingQueue.poll
in your situation, rather than BlockingQueue.take
? Both are interruptible, and your thread only has one task, so blocking is not undesirable.
Upvotes: 0