user1172468
user1172468

Reputation: 5464

Emulating a blocking function call over a Queing Interface such as JMS in Java

I have processing engine which is ONLY available to me (exposed) through a Queing interface.

I would like to abstract away the Que based interface to one that is a simple blocking interface that can be called concurrently by multiple threads.

This interface will be called roughly 500 times a second from a single JVM process.

The question at a high level is: How does one transform a Que based interface into thread safe blocking function call in Java.

Notes:

  1. I put the request on a Request_Q
  2. The request is a JSON object of the form where I generate a unique id and the payload is the message I want processed:

    {id:1234567890, payload:"foo"}

  3. After the engine is done processing this request it places the response on the queue Response_Q

  4. The response message is of the same form as the request object except that the payload corresponds to the processed message and the id is the same as the request object. For example the response to the said request object could look like:

    {id:1234567890, payload:"bar"}

  5. I pop the message of the Response_Q

I wish to encapsulate this queue based processing system in a traditional blocking function call of the following form:

public String process (String payload) {

   JSONObject request = new JSONObject();
   request.set("id", /* ... some way to generate a random key */ );
   request.set("payload", payload);

   // push the request onto Request_Q

   // Question
   // What is an elegant way to organize the code from here down?
}

Upvotes: 0

Views: 90

Answers (3)

Miserable Variable
Miserable Variable

Reputation: 28752

Here's one way to do it:

  • Create a background thread that reads from the response_q
  • Create a global Map<String, Queue> that the response thread has access to

In the blocking function

  1. add new entry to the map, with request id as key and a new queue as value, it it this request's private queue
  2. write to the request_q
  3. wait for response in private_q
  4. when available, remove the map entry and return the response read from queue

In the background thread, in an infinite loop

  1. read response from the global response_q
  2. extract id from response
  3. identify private queue from the global map
  4. write to private queue

Some synchronization is required. Explore better data structure for the private queue.

Update

As msandiford commented, use the lock-free ConcurrentHashMap and a BlockingQueue.

Upvotes: 1

Alex
Alex

Reputation: 2485

You could use Apache Camel for this, a couple of common enterprise integrations apply to this type of use case.

Generally, anything you need to do always fits one or more EIPs, and Camel is like scaffolding to help you code to those well defined and well solved problems.

The best fit here would probably be request/reply

Upvotes: 1

GreyBeardedGeek
GreyBeardedGeek

Reputation: 30088

While I think you might be better off figuring out how to make your code non-blocking instead of blocking the queue, here's one way to do it:

In your process() method, set up a loop that peeks at the queue, and only exits when it finds the response to the message that was sent in that method invocation. This will be very inefficient, but I don't think that there's an efficient way to do this.

Upvotes: 1

Related Questions