Reputation: 5464
I have processing engine which is ONLY available to me (exposed) through a Queing interface.
I would like to abstract away the Que based interface to one that is a simple blocking interface that can be called concurrently by multiple threads.
This interface will be called roughly 500 times a second from a single JVM process.
The question at a high level is: How does one transform a Que based interface into thread safe blocking function call in Java.
Notes:
The request is a JSON object of the form where I generate a unique id and the payload is the message I want processed:
{id:1234567890, payload:"foo"}
After the engine is done processing this request it places the response on the queue Response_Q
The response message is of the same form as the request object except that the payload corresponds to the processed message and the id is the same as the request object. For example the response to the said request object could look like:
{id:1234567890, payload:"bar"}
I wish to encapsulate this queue based processing system in a traditional blocking function call of the following form:
public String process (String payload) {
JSONObject request = new JSONObject();
request.set("id", /* ... some way to generate a random key */ );
request.set("payload", payload);
// push the request onto Request_Q
// Question
// What is an elegant way to organize the code from here down?
}
Upvotes: 0
Views: 90
Reputation: 28752
Here's one way to do it:
Map<String, Queue>
that the response thread has access toIn the blocking function
In the background thread, in an infinite loop
Some synchronization is required. Explore better data structure for the private queue.
Update
As msandiford
commented, use the lock-free ConcurrentHashMap
and a BlockingQueue
.
Upvotes: 1
Reputation: 2485
You could use Apache Camel for this, a couple of common enterprise integrations apply to this type of use case.
Generally, anything you need to do always fits one or more EIPs, and Camel is like scaffolding to help you code to those well defined and well solved problems.
The best fit here would probably be request/reply
Upvotes: 1
Reputation: 30088
While I think you might be better off figuring out how to make your code non-blocking instead of blocking the queue, here's one way to do it:
In your process() method, set up a loop that peeks at the queue, and only exits when it finds the response to the message that was sent in that method invocation. This will be very inefficient, but I don't think that there's an efficient way to do this.
Upvotes: 1