Reputation: 347
I am trying to create another thread that processes data while main thread doing some more. Main thread must wait till another thread finishes doStuff
with all elements.
And my implementation is pretty straight forward.
Please, take a look at processData
and tell me is there some more Java-like way to do it?
I read about Phaser
but still can't imagine how to use it or what else can I try?
public class MyClass {
private final NodeQueue queue;
MyClass() {
queue = new NodeQueue();
}
public void processData(Set<String> dataSet) {
// allow transfer
queue.transferEnable()
Thread transfer = new Thread(() -> {
queue.transferData();
})
transfer.start();
// doStuff in another thread
for (String element : dataSet) {
queue.add(element);
// do something more
}
// stop transfer
queue.waitTillEmptyQueue();
queue.transferDisable();
try {
transfer.join();
} catch (...) {
// catch
}
}
public class NodeQueue {
private final ConcurrentLinkedQueue<String> queue;
private boolean transferEnabled;
protected NodeQueue() {
queue = new ConcurrentLinkedQueue<>();
transferEnabled = true;
}
protected void transfer() {
while (!queue.isEmpty()) {
doStuff(queue.poll());
}
}
public void transferData() {
while (tranfserEnabled) {
transfer();
}
}
public synchronized void transferEnable() {
transferEnabled = true;
}
public synchronized void transferDisable() {
transferEnabled = false;
}
public void add(String s) {
queue.add(s);
}
public synchronized void waitTillEmptyQueue() {
while (!queue.isEmpty()) {
if (queue.isEmpty()) {
break;
}
}
}
}
}
Upvotes: 0
Views: 56
Reputation: 1124
Let me copy the Phaser
example from my own post
Main thread
// Add producer as a party
Phaser phaser = new Phaser(1);
for (int i=0; i<10000; ++i) {
// Add each task as a party
phaser.register();
queue.put(new Task());
}
// Producer arrived and wait for completion of all tasks
phaser.arriveAndAwaitAdvance();
// At the end, there is only 1 party left which is the producer itself
Consumer
while (true) {
Task task = queue.take();
processTask(task);
// Task completed and remove itself as a party
phaser.arriveAndDeregister();
}
Upvotes: 1