Reputation: 2922
I'm implementing a custom datastructure that should follow these requirements:
.equals()
)I have implemented my own class - I started modifying LinkedBlockingQueue first - but I have a point where I can have a bad interleaving and I can't figure out how to avoid one.
To test the datastructure I created a method with 3 threads. Each of these threads will try and receive a unique string (e.g., "Thread1" for the first thread etc..). If the string is found in the datastore, it will insert a string for the next thread (e.g., Thread 1 will insert "Thread2").
This way, a thread will block and only send a message if it's message is in the store. To start things off, after I started my threads, I manually add "Thread1" to the store. This should trigger Thread 1 to get his value out of the store and insert a value for thread 2. Thread 2 should then be notified and get his value and insert a value for thread 3 etc..
However, I notice that the cycle stops after a random times of passing the message around. This is - I think - due to a bad interleaving that is possible in the takeOrWait()
method (as indicated there). I have tried several solutions but I can't figure out a way.
The problem is - I think - that I have to release the lock to modify, followed by a call to sync.wait()
. In between those calls a thread can already insert an element in the queue which would cause all the waiting threads to miss the notification.
Is it possible to initiate the wait()
before I release the lock?
For completness' sake I have added the Main.java class I use to test.
BlockingDataStore
public class BlockingStore<T> {
// Linked list node.
static class Node<E> {
/**
* The item, volatile to ensure barrier separating write and read
*/
volatile E item;
Node<E> next;
Node(E x) {
item = x;
}
}
private Node<T> _head;
private Node<T> _lastPtr;
private int _size;
// Locks
private Lock changeLock = new ReentrantLock();
private final Object sync = new Object();
//////////////////////////////////
// CONSTRUCTOR //
//////////////////////////////////
public BlockingStore() {
_head = null;
_lastPtr = null;
}
//////////////////////////////////
// INTERNAL MODIFICATION //
//////////////////////////////////
/**
* Locates an element in the storage and removes it.
* Returns null if the element is not found in the list.
*
* @param toRemove Element to remove.
* @return Returns the removed element.
*/
private T findAndRemove(T toRemove) {
T result = null;
// Empty queue.
if (_head == null)
return result;
// Do we want the head?
if (_head.item.equals(toRemove)) {
result = _head.item;
_head = _head.next;
this._size--;
return result;
}
Node<T> previous = _head;
Node<T> current = previous.next;
while (current != null) {
if (current.item.equals(toRemove)) {
// Take the element out of the list.
result = current.item;
// we have to update the last pointer.
if (current == _lastPtr)
_lastPtr = previous.next;
else
previous.next = current.next;
this._size--;
return result;
}
previous = current;
current = current.next;
}
return result;
}
/**
* Adds an element to the end of the list.
*
* @param toAdd Element to add to the end of the list.
*/
private void addToEnd(T toAdd) {
// If the queue is empty
if (_head == null) {
_head = new Node<T>(toAdd);
_lastPtr = _head;
} else {
_lastPtr.next = new Node<T>(toAdd);
_lastPtr = _lastPtr.next;
}
this._size++;
}
/**
* Takes an element from the front of the list.
* Returns null if list is empty.
*
* @return Element taken from the front of the list.
*/
private T takeFromFront() {
// Check for empty queue.
if (_head == null)
return null;
T result = _head.item;
_head = _head.next;
this._size--;
return result;
}
//////////////////////////////////
// API METHODS //
//////////////////////////////////
/**
* Takes an element from the datastore,
* if it is not found the method blocks
* and retries every time a new object
* is inserted into the store.
*
* @param toTake
* @return
*/
public T takeOrWait(T toTake) {
T value;
changeLock.lock();
value = findAndRemove(toTake);
// Wait until somebody adds to the store
// and then try again.
while (value == null)
// Sync on the notification object
// such that we are waken up when there
// is a new element.
synchronized (sync) {
changeLock.unlock(); // allow writes.
// I think I can have bad inter-leavings here.
// If an insert is interleaved here, the thread
// will never be notified..
try {
sync.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
changeLock.lock();
value = findAndRemove(toTake);
}
changeLock.unlock();
return value;
}
public T dequeue() {
T value;
changeLock.lock();
value = takeFromFront();
changeLock.unlock();
return value;
}
public void enqueue(T toPut) {
changeLock.lock();
addToEnd(toPut);
// Notify about new element in queue.
synchronized (sync) {
sync.notifyAll();
changeLock.unlock();
}
}
public int size() {
return _size;
}
}
** Main.java **
public class Main {
public static void main(String[] args) throws InterruptedException {
final BlockingStore<String> q = new BlockingStore<String>();
new Thread(new Runnable() {
public String name = "Thread 1: ";
public String to = "Thread 2: ";
public void print(String message) {
System.out.println(name + message);
}
@Override
public void run() {
while (true) {
String value = q.takeOrWait(name);
print("Got: " + value);
q.enqueue(to);
}
}
}).start();
new Thread(new Runnable() {
public String name = "Thread 2: ";
public String to = "Thread 3: ";
public void print(String message) {
System.out.println(name + message);
}
@Override
public void run() {
while (true) {
String value = q.takeOrWait(name);
print("Got: " + value);
q.enqueue(to);
}
}
}).start();
new Thread(new Runnable() {
public String name = "Thread 3: ";
public String to = "Thread 1: ";
public void print(String message) {
System.out.println(name + message);
}
@Override
public void run() {
while (true) {
String value = q.takeOrWait(name);
print("Got: " + value);
q.enqueue(to);
}
}
}).start();
Thread.sleep(1000);
System.out.println("Main: Sending new message to queue for thread 1");
q.enqueue("Thread 1: ");
}
}
Upvotes: 0
Views: 1055
Reputation: 27210
The problem is - I think - that I have to release the lock to modify, followed by a call to sync.wait()
Sounds like lost notification. Understand that sync.notify()
does nothing at all if there is not some other thread already blocked in sync.wait()
. The sync
object does not remember that it was notified.
This doesn't work (based on your example):
public void waitForFlag() {
...
while (! flag) {
synchronized (sync) {
try {
sync.wait();
} catch (InterruptedException e) { ... }
}
}
}
public void setFlag() {
flag = true;
synchronized (sync) {
sync.notifyAll();
}
}
Suppose thread A calls waitForFlag(), finds flag to be false, and then is preempted. Then, thread B calls setFlag(), notifying no-one. Finally, thread A calls sync.wait().
Now you have the flag set, and thread A blocked in a wait() call, waiting for somebody to set the flag. That's what a lost notification looks like.
Here's how it should look:
public void waitForFlag() {
...
synchronized(sync) {
while (! flag) {
try {
sync.wait();
} catch (InterruptedException e) { ... }
}
}
}
public void setFlag() {
synchronized (sync) {
flag = true;
sync.notifyAll();
}
}
This way, the notification can not be lost because the statement that sets the flag and the statement that tests the flag are both inside synchronized blocks.
Upvotes: 1