Reputation: 752
I have a multi threaded application which receives asynchronous messages. How to wait till message is received (or timeout)?Is there a better way to do this with wait() and notify()?
I tried to do this with raw thread sleep with atomic references.
class SignalReceiver{
String message;
Boolean messageReceived; // used AtomicBoolean
void receive(String message){
this.message = message;
messageReceived = true; // set message flag
}
void waitTillMessageReceived(long timeout){
if(!messageReceived){ // message could be received before
while(!messageReceived){
Thread.sleep(100);
// wait only till timeout
}
}
messageReceived = false; // reset message flag
}
}
Upvotes: 3
Views: 335
Reputation: 344
This is an exampre with wait / notify
public class SignalReceiver{
String message;
public static void main(String[] args) {
final SignalReceiver sr=new SignalReceiver();
new Thread(() -> {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
}
System.out.println("message sent");
sr.receive("hello");
}).start();;
System.out.println("waiting...");
String mess=sr.waitTillMessageReceived(5000L);
System.out.println("message: "+mess);
}
public synchronized void receive(String message){
this.message = message;
notify();
}
public synchronized String waitTillMessageReceived(long timeout){
try {
wait(timeout);
}
catch (InterruptedException e) {
}
return message;
}
}
The example presents some problems. For example it does not handle properly the arrival of many simultaneous messages. As suggested in comments, it is better to use proper synchronised classes like java.util.concurrent.LinkedBlockingDeque.
Upvotes: 1
Reputation: 6058
BlockingQueue
is the probably the easiest way to implement itimport java.util.concurrent.*;
class SignalReceiver {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public void receive(String message) {
queue.offer(message);
}
public String waitTillMessageReceived() throws InterruptedException {
return queue.take();
}
}
Upvotes: 0
Reputation: 339837
BlockingQueue
I followed the lead provided by Louis Wasserman and by markspace, suggesting the use of a queue.
I went with a BlockingQueue
which is (a) thread-safe, and (b) can block when adding & removing elements. I started with the example code given on the Javadoc for that interface.
For an implementation of the BlockingQueue
interface, I chose ArrayBlockingQueue
.
I created a pair of producer-consumer classes. They produce and consume a randomly-generated UUID value via the UUID
class.
Here is the producer class. In an endless loop, the run
method attempts to add a UUID object to the queue. The attempt to add blocks if the queue is currently busy, continuing when the queue becomes available. If interrupted, such as by the closure of the executor service, this run
method completes, thereby ending the submitted task.
package work.basil.example.async;
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
public class Producer implements Runnable
{
// Member fields.
private final BlockingQueue < UUID > queue;
// Constructor.
Producer ( BlockingQueue < UUID > q ) { queue = q; }
public void run ( )
{
System.out.println( "`Producer` object is starting its `run` method. " + Instant.now() );
try
{
while ( true )
{
// Sleep, to simulate some lengthy work.
Duration timeToSleep = Duration.ofMillis( ThreadLocalRandom.current().nextInt( 1_000 , 3_000 ) );
try { Thread.sleep( timeToSleep ); } catch ( InterruptedException e ) { break; }
queue.put( produce() ); // Post a new value to our BlockingQueue.
}
}
catch ( InterruptedException e )
{
// Could be interrupted by an executor service closing, or other reasons.
// Simply let this `Runnable` object end its `run` method.
// No code needed here.
}
System.out.println( "`Producer` object is ending its `run` method, after being interrupted. " + Instant.now() );
}
// Logic
UUID produce ( )
{
UUID uuid = UUID.randomUUID();
System.out.println( "Producing UUID = " + uuid + " at " + Instant.now() );
return uuid;
}
}
And the consumer class. The run
method asks for the next available element from the queue. The call blocks until an element becomes available. While waiting, if an interrupt happens such as when the executor service closes, the run
method completes, thereby ending the submitted task.
package work.basil.example.async;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable
{
private final BlockingQueue < UUID > queue;
Consumer ( BlockingQueue < UUID > q ) { queue = q; }
public void run ( )
{
System.out.println( "`Consumer` object is starting its `run` method. " + Instant.now() );
try
{
while ( true ) { consume( queue.take() ); }
}
catch ( InterruptedException e )
{
// Could be interrupted by an executor service closing, or other reasons.
// Simply let this `Runnable` object end its `run` method.
// No code needed here.
}
System.out.println( "`Consumer` object is ending its `run` method, after being interrupted. " + Instant.now() );
}
void consume ( UUID uuid )
{
System.out.println( "Consuming UUID: " + uuid + " at " + Instant.now() );
}
}
And an app class to demonstrate the queue in action.
Be aware that you are responsible for gracefully closing your executor service at some point, at least by the end of your app’s execution. Otherwise the backing pool of threads may run indefinitely, like a zombie 🧟♂️.
package work.basil.example.async;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.*;
public class App
{
public static void main ( String[] args )
{
System.out.println( "INFO - Demo start. " + Instant.now() );
BlockingQueue q = new ArrayBlockingQueue( 10 , true );
Consumer c = new Consumer( q );
Producer p = new Producer( q );
ExecutorService executorService = Executors.newFixedThreadPool( 2 );
executorService.submit( c );
executorService.submit( p );
// Let the background threads do their work for a while.
try { Thread.sleep( Duration.ofSeconds( 8 ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
System.out.println( "INFO - Main thread is interrupting tasks run by executor service." );
executorService.shutdownNow();
try
{
if ( ! executorService.awaitTermination( 30 , TimeUnit.SECONDS ) )
{
System.err.println( "Executor service failed to terminate. " + Instant.now() );
}
}
catch ( InterruptedException e ) { throw new RuntimeException( e ); }
System.out.println( "INFO - Demo end. " + Instant.now() );
}
}
This seems to be working.
INFO - Demo start. 2023-02-28T07:24:03.603169Z
`Consumer` object is starting its `run` method. 2023-02-28T07:24:03.614092Z
`Producer` object is starting its `run` method. 2023-02-28T07:24:03.614092Z
Producing UUID = 5fe7dd8a-9cc1-47b8-93aa-87bc031c2534 at 2023-02-28T07:24:04.855110Z
Consuming UUID: 5fe7dd8a-9cc1-47b8-93aa-87bc031c2534 at 2023-02-28T07:24:04.863520Z
Producing UUID = 97a31391-8c5c-4430-9737-b967a8c63987 at 2023-02-28T07:24:06.678767Z
Consuming UUID: 97a31391-8c5c-4430-9737-b967a8c63987 at 2023-02-28T07:24:06.679680Z
Producing UUID = af485e45-5dd5-4b68-82c8-41e1443c4566 at 2023-02-28T07:24:08.337011Z
Consuming UUID: af485e45-5dd5-4b68-82c8-41e1443c4566 at 2023-02-28T07:24:08.337917Z
Producing UUID = 4853a43c-feb6-41ec-91b0-1520c9f67347 at 2023-02-28T07:24:10.927173Z
Consuming UUID: 4853a43c-feb6-41ec-91b0-1520c9f67347 at 2023-02-28T07:24:10.928794Z
INFO - Main thread is interrupting tasks run by executor service.
`Consumer` object is ending its `run` method, after being interrupted. 2023-02-28T07:24:11.619844Z
`Producer` object is ending its `run` method, after being interrupted. 2023-02-28T07:24:11.619725Z
INFO - Demo end. 2023-02-28T07:24:11.621988Z
Caveat: I am no expert on concurrency matters. Use at your own risk.
Upvotes: 2