ankthepunk
ankthepunk

Reputation:

JMS Message Driven Bean worker synchronization

We are just starting to build our JMS architecture and have the following basic setup:

  1. GLassfish v2.1
  2. MDB listening on a Topic through a TopicConnectionFactory (all on local server)

Now, the MDB spawns a worker thread when a new message arrives and even though we have in order delivery of messages, we need a synchronization mechanism so that threads check for a certain condition before processing the request concurrently.

Is there a way for these threads to share data? Or are there any other mechanisms (except for database table/row locks) that we can use for synchronization?

Thanks in advance.


To clarify, I am not creating my own threads. As everyone rightly pointed out, the container does that for me. Let me help explain my dilemma with an example.

-Message A arrives at t=0 which 'creates' data id 1

-Message B arrives at t=0.1 which 'updates' data id 1

Now assuming the container spawns 2 workers to process A & B and that it takes much more time to 'create' data than update it, the update would process earlier and have no effect.

To be clearer,

-While processing Message B, I would look for data id 1 at t=1 (not find it and thus have finish without doing anything).

-Data id 1 would be created while processing Message A at t=2.

Upvotes: 1

Views: 6408

Answers (7)

WesternGun
WesternGun

Reputation: 12728

Use a local thread safe data structure and put your data there, for example a concurrent map(ConcurrentHashMap is what I use; wrapped in a MyCache bean is better); key being id. In the processing of Message B, looping(with a timeout maybe) looking up in that map if the id it needs already exists. If not, Thread.sleep(100) and try again, until it's found; then remove the id from cache, and process data with id 1.

@Inject
MyCacheBean myCacheBean;

...

messageSender.send(messageA);
messageSender.send(messageB);
// while creating data, "myCacheBean.put(messageA.getId(), data);" on the MessageDriven bean

// in the MDB of B:
while (!myCacheBean.keyExists(messageA.getId())) {
    LOG.info("Still waiting for data of id {}", messageA.getId());
    Thread.sleep(100); // 100ms
}
// id 1 arrived, meaning message A received and MessageDriven bean finished
myCacheBean.removeKeyAndValue(1);

// update data 

One catch is that ConcurrentHashMap does not allow null key/value. You can wrap null value in Optional.empty() or a custom wrapper object. See Spring ConcurrentMapCache for an example. API here. (but I think the impl. is wrong).

Also EJB does not allow you to manage thread yourself. Thread.sleep() is considered "managing thread yourself". Maybe a while() without sleep is OK, but that while() would be entered millions of times when it needs to wait, say, 3 seconds.

The shortcoming is that this cache will be gone when pod/app restart. So application shutdown logic needs to be able to check this map, stop new values coming in, and wait until old values all removed(ongoing tasks done). More advanced solution would be a distributed cache/data storage running as a separate service, where every pod/instance of your application queries the data saved there.

Upvotes: 0

Robin
Robin

Reputation: 24262

As was mentioned by several people, you should not create your own threads in an MDB (or any other type of EJB or a servlet for that matter).

Many EJB containers will not actually allow you to create and run threads. There is one safe way to do it though, by using a WorkManager from the commonj specification, although I see no reason for it in this particular case, since the MDB is already running in its own 'worker thread'.

See info on spawning threads here for more information on why you shouldn't spawn threads in a Java EE server and how to do it safely when you need to.

Upvotes: 0

James Anderson
James Anderson

Reputation: 27478

As mentioned above the JMS framework handles schedualing issues such as dispatching threads. Anything you do in this are will not only be inferior to the default beheaviour it will probably severly limit the functionality of your JMS.

The more sophisticated JMS handlers are designed to work across several nodes (= servers ) so any shared memory solution would limit you to a single JVM on a single node, which would be a pity as the great advantage of JMS is scalability.

A possible JMSy solution would be to have a "cookie" queue will a single dummy "cookie" message to synchroise activities. When its time for your process to perform a contentious activity it "gets with wait" the single message from the "cookie" queue, when contentious work is complete it puts the cookie back on the queue. The magic of JMS will handle nearly all the blocking, waiting and error recovery.

Upvotes: 2

Thraidh
Thraidh

Reputation: 690

The real problem is, that the Application Server usually spawns the mentioned workers by itself. While JMS guarantees that the messages are consumed in the same order as they are produced at least within one producer, the MDB spec explicitely states that the order is not preserved (because of the mentioned workers). See section 5.4.11 of JSR-000220 Enterprise JavaBeans 3.0 Final Release (ejbcore).

There is no portable and 100% reliable way to circumvent this. Due to the nature of the workers race conditions are introduced which cannot be controlled.

Fortunately most Application Servers have ways, albeit proprietary and incompatible, to configure the number of workers.

For JBoss and ActiveMQ this works:

@PoolClass(value = org.jboss.ejb3.StrictMaxPool.class, maxSize = 1)
@MessageDriven(activationConfig = {
    @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = "maxSessions", propertyValue = "1"),
    @ActivationConfigProperty(propertyName = "destination", propertyValue = "TEST.FOO")
})
@ResourceAdapter("activemq-ra.rar")
public class NewMessageBean implements MessageListener { ... }

In this case the "maxSessions" are the number of workers. It may be different with other JMS providers, but that should point into the right direction.

Upvotes: 4

michael
michael

Reputation: 9769

Looks like the question completely changed in nature after everyone already responded to it. I'll chip in a very belated response for posterity; this assumes there is some type of message ID that can be used for ordering. You say "we have in order delivery of messages", but you don't say exactly how this is achieved.

...Since you've clarified that you're not starting your own threads, you basically have a race condition between the "creation" of data @ ID=1 and its subsequent "update". I assume you're locking the data @ ID=1 while it's being created and/or updated. So, there are two possibilities:

  1. the "create data" message arrives first: (1) lock ID=1, (2) create the data. (3) release the lock. (4) apply the pending "update".
  2. the "update" arrives before the "create": (1) lock ID=1, (2) create the missing data (i.e, do an 'upsert': insert the data even though it's not there). (3) release the lock. (4) ignore the pending "create data" message (the 'create' message has a lower sequence number than the 'update'
    • If the message sender is allowed to send updates and inserts concurrently (without request/response), then they really should have some sort of sequence number. If so, when the 'insert' arrives, its sequence number will be less than the current value in that row, so it can be ignored. Or, if the 'create' message type is distinct from an 'update' message type, then 'creates' can always be ignored if the data already exists.

I think the question you have is how to synchronize on data. Basically, threads share objects and create a mutex (mutual exclusion) to allow a single thread access to the data, causing another thread to block. This can be done simply via Java's low-level synchronization facilities (the "synchronized" keyword), or built-in classes that assist with this ( http://java.sun.com/docs/books/tutorial/essential/concurrency/highlevel.html ).

Upvotes: 0

John M
John M

Reputation: 13229

Pedant alert! I'm the kind of guy that reads the actual specs for technologies.

Reading the EJB spec version 3.0, section 21.1.2 (Programming Restrictions) disallows using threads in your code. Here's the language and the rationale ...

The enterprise bean must not attempt to manage threads. The enterprise bean must not attempt to start, stop, suspend, or resume a thread, or to change a thread’s priority or name. The enterprise bean must not attempt to manage thread groups.

These functions are reserved for the EJB container. Allowing the enterprise bean to manage threads would decrease the container’s ability to properly manage the runtime environment.

So if you do what you're saying, the EJB police will come knocking on your door in the middle of the night and take you away. Or your app might malfunction and the vendor will laugh when you complain. Or nothing bad at all will happen.

But, as duffymo says, why do this? If you want the scalability offered by lots of threads, can you configure that in for your MDB? The point of EJB's is to handle stuff like that for you.

Upvotes: 6

duffymo
duffymo

Reputation: 308763

I don't see why the MDB has to spawn a worker thread. There's a thread pool associated with the message listeners in JMS. That's the thread that's supposed to be doing the work.

The EJB spec says no thread spawning in your beans. The container handles threading. That includes MDBs as well.

The listener should be processing the message it takes off the queue. The data it needs should be in the message. What's the need for sharing?

I think your approach goes against recommended EJB practices.

Upvotes: 4

Related Questions