Reputation: 173
I implemented the Spring-TaskExecutor (which is the equivalent of the JDK 1.5's Executor.) to process notifications notifications receiving from external systems.
Interface with only one method:
public interface AsynchronousService {
void executeAsynchronously(Runnable task);
}
and the corresponding implementation:
public class AsynchronousServiceImpl implements AsynchronousService {
private TaskExecutor taskExecutor;
@Override
public void executeAsynchronously(Runnable task) {
taskExecutor.execute(task);
}
@Required
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
}
Xml-configuration of the task executor (legacy application):
<bean id="taskExecutor" class="org.example.impl.NotificationPool">
<property name="corePoolSize" value="1"/>
<property name="maxPoolSize" value="1"/>
<!--<property name="queueCapacity" value="100"/>-->
<property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>
1 is set for both, corePoolSize and maxPoolSize, since I want the tasks to execute sequentially (only 1 thread is created by the pool which process the tasks).
I want to order my task based on the date when i received the notification, theregore I need to override this function to allow priority ordering:
public class NotificationPool extends ThreadPoolTaskExecutor {
@Override
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
}
Here is the Notification task class:
public class NotificationTask implements Runnable, Comparable<NotificationTask> {
private final NotificationService notificationService;
private final Notification notification;
public NotificationService(NotificationService notificationService,
Notification notification) {
this.notificationService = notificationService;
this.notification = notification;
}
@Override
public int compareTo(NotificationTask task) {
return notification.getTimestamp().compareTo(task.getTimestamp());
}
@Override
public void run() {
notificationService.processNotification(notification);
}
}
And this is how I execute it:
asynchronousService.executeAsynchronously(new NotificationTask (notificationService, notification));
This works fine with a bounded queue, howere I need an unbounded queue. As you can see on the xml-configuration, the line to define the queue capacity is commented out:
<!--<property name="queueCapacity" value="100"/>-->
However, if I do this then I get an OutOfMemoryException. It seems that it tries to create an unbounded queue right at the start of the application. However, the Executor-Service allows us to use unbounded queues - but I don't know how to do that here.
Upvotes: 1
Views: 1307
Reputation: 18235
According to the java doc:
An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError)
So basically, it's unbounded, and you don't need to care about the queue size.
However, the queue is backed by an array:
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
// Here is the initialization of backed array
this.queue = new Object[initialCapacity];
}
So you need to provide a reasonable initial queue size. After that, you're done.
However, as stated, as more and more element comes, the queue might throw OutOfMemory
exception when it tries to grow internal array if the internal array full.
private void tryGrow(Object[] array, int oldCap)
But it's very unlikely, unless your project produces millions of notifications in a short time
Upvotes: 1