Reputation: 4475
I have a Stateless Session Bean with a TimerService.
On timeout, it starts consuming a JMS queue. In the processing of the message it requires access to an external resource which may be temporary unavailable. The timeout method calls MessageConsumer.receiveNoWait()
in a loop until:
This way I'm in control when to restart and I have no sleeping threads thanks to the TimerService callback.
I would like to have multiple occurances of this session bean to anticipate bottlenecks on the queue:
+-----<ejb>-------+
| timerService |
| | +---------------------+
----| onTimeout() {} | -----------> | external dependency |
/ | | / +---------------------+
/ +-----------------+ /
/ /
+---------+ / /
|||queue|||K /
+---------+ \ /
\ +-----<ejb>-------+ /
\ | timerService | /
\ | | /
----| onTimeout() {} |
| |
+-----------------+
My session bean looks like this (simplified of course):
@Stateless
public class MyJob {
@Resource
private TimerService timerService;
@PostConstruct
public void init() {
registerNewTimer(1000L); // -> problem: timerService not accessible
System.out.println("Initial Timer created");
}
private void registerNewTimer(long duration) {
TimerConfig config = new TimerConfig();
config.setPersistent(false);
timerService.createSingleActionTimer(duration, config);
}
@Timeout
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public void execute() {
try {
// instantiate JMS session and consumer
while ((message = consumer.receiveNoWait()) != null) {
// business logic with message
message.acknowledge();
}
// queue empty, let's stop for now and register a new timer + 10min
registerNewTimer(10*60*1000);
} catch (ResourceException re) {
// external resource unavailable, let's wait 30min
registerNewTimer(30*60*1000);
// last message not acknowledged, so rolled back
}
}
}
I don't want to use Message Driven Beans as I would like to stay in control when to consume messages (see the delay logic in case of errors).
The problem:
The error is in the @PostConstruct
annotated init()
method: at this moment it is not allowed to use the timerService. It is allowed when I make the sessionbean @Singleton
but then I lose the possibility to process the queue in parallel. Does anyone has an idea how to solve this ? If TimerService is not the right mechanism, what can be an alternative. Is there a PostConstruct alternative which allows access to the referenced resources and is only called once after instantiation ?
Thanks in advance for any constructive information.
Upvotes: 2
Views: 524
Reputation: 4475
I've found a solution myself so I post it here so it may help others.
I added a new Singleton,Startup bean which holds a list of all beans implementing the MyJobExecutor interface thanks to CDI. In a JEE environment, CDI works well with EJBs and so it injects session beans! Warning: CDI will only inject classes which directly implement MyJobExecutor so it does not work if you have an abstract class implementing MyJobExecutor and a concrete bean extending from this abstract class. It must explicitly implements MyJobExecutor
.
In the postconstruct of the startup class, I call every MyJobExecutor bean to register a new timer in its TimerService. This way I can create the first singleActionTimer(s) for every session bean.
public interface MyJobExecutor {
// how many timers to register for this bean meaning how many parallel processes.
public int nrOfParallellInstances();
}
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
@Singleton
@Startup
public class MyJobStartup {
@Inject
@Any
private Instance<MyJobExecutor> myJobs;
@PostConstruct
public void startIt() {
for (MyJobExecutor ajob : myJobs) {
for(int i=0; i<ajob.nrOfParallellInstances(); i++)
// register every instance with a 1 second delay between each timeout
ajob.registerNewTimer(1000L*(i+1));
}
}
}
The inner loop will register multiple timers for the same bean based on the ajob.nrOfParallellInstances()
value.
However, a TimerService will not trigger a timeout when the previous timeout is still running. This blocks the parallel processing as expected :(
To solve this, I adapted the timeout method to not perform the business logic itself but to launch a managed thread which performs the business logic. This way the onTimeout
method quickly ends and the timer service will launch the next timeout resulting in multiple parallel executions (each in a managed thread):
import javax.enterprise.concurrent.ManagedThreadFactory;
import javax.naming.InitialContext;
@Resource(name = "concurrent/AsyncJobThreadFactory")
private ManagedThreadFactory threadFactory;
@Timeout
private void onTimeout(Timer timer) {
LOG.info("Timeout triggered for " + timer.getInfo());
Thread thread = threadFactory.newThread(new AsyncExecRunnable((String) timer.getInfo()));
if (thread != null) {
thread.start();
} else {
LOG.warn("No thread available for this job. Retry in 5 minutes.");
registerNewTimer(1000L * 60 * 5);
}
}
private static class AsyncExecRunnable implements Runnable {
private String extra info;
public AsyncExecRunnable(String info) {
this.info = info;
}
@Override
public void run() {
try {
LOG.info("Job executor thread started for " + info);
InitialContext ic=new InitialContext();
// business logic. With ic you can lookup other EJBs
} catch (Exception e) {
LOG.error("Problem executing AsyncJobExecutor:", e);
}
}
}
With this setup I have:
Upvotes: 0