Reputation: 85
I have a Producer/Consumer situation where producer produces domains for the consumer to visit. The Consumer sends a https request and grabs the links from the page and submits them back to the producer. When the Producer finishes, the consumer does not and hangs on the final domain. I cannot for the life of my figure out why this is happening.
I have simplified my question
Main:
public class Main {
public static void main(String[] args) throws InterruptedException {
try
{
Broker broker = new Broker();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.execute(new Consumer(broker));
threadPool.execute(new Consumer(broker));
Future producerStatus = threadPool.submit(new Producer(broker));
// this will wait for the producer to finish its execution.
producerStatus.get();
threadPool.shutdown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
Broker:
public class Broker {
private BlockingQueue<String> QUEUE = new LinkedBlockingQueue<String>();
public Boolean continueProducing = Boolean.TRUE;
public void put(String data) throws InterruptedException
{
this.QUEUE.put(data);
}
public String get() throws InterruptedException
{
//return this.queue.poll(1, TimeUnit.SECONDS);
return this.QUEUE.take();
}
}
Consumer:
public class Consumer implements Runnable{
private Broker broker;
public Consumer(Broker broker) {
this.broker = broker;
}
@Override
public void run() {
try {
String data = broker.get();
while (broker.continueProducing || data != null)
{
Thread.sleep(1000);
System.out.println("Consumer " + Thread.currentThread().getName() + " processed data from broker: " + data);
data = broker.get();
}
System.out.println("Comsumer " + Thread.currentThread().getName() + " finished its job; terminating.");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
Producer:
public class Producer implements Runnable{
private Broker broker;
public Producer(Broker broker) {
this.broker = broker;
}
@Override
public void run() {
try
{
for (int i = 0; i < 2; ++i) {
System.out.println("Producer produced: " + "https://example" + i + ".com");
Thread.sleep(100);
broker.put("https://example" + i + ".com");
}
//broker.put("https://example.com/2");
this.broker.continueProducing = Boolean.FALSE;
System.out.println("Producer finished its job; terminating.");
}catch(Exception e)
{
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
Upvotes: 0
Views: 414
Reputation: 682
Updated answer:
When I run your code, the consumer gets stuck on the line data = broker.get()
. The broker is calling the BlockingQueue.take
method. Here's the Javadoc for this method (emphasis mine):
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
That means that even if the producer isn't producing anything, the consumer will still wait for something to be produced.
One possible solution for you would be to use a "poison pill" approach. Assuming that you only ever have one producer, your Broker
class could look like this:
public class Broker {
private static final String POISON_PILL = "__POISON_PILL__";
private BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public void put(String data) {
queue.add(data);
}
public void doneProducing() {
queue.add(POISON_PILL);
}
public String get() throws InterruptedException {
String result = queue.take();
if (result.equals(POISON_PILL)) {
queue.add(POISON_PILL);
return null;
} else {
return result;
}
}
}
Answer for previous code:
It would be great if you could narrow the scope of this question so that it included only the minimum amount of code to get the deadlock. As it currently is, there's a lot of code that you're posting that is not relevant, and there's some code that is relevant that you're not posting.
Additionally, there are a lot of problems with your current code. Your toLinkedHashSet
method does not compile. In your add method, you're calling the BlockingQueue.put
method even though your BlockingQueue
should never hit its limit. You claim to want Ο(1) time for contains
, but your code has Ο(n) time. You also seem to be doing a lot of unnecessary copying in your addAll
and contains
methods.
There's not enough information here for me to know what the problem is, but one thing that could be causing your problem is in your get
method. If the consumer thread is interrupted, then your get
method will cause it to uninterrupt itself (which probably wouldn't lead to a deadlock, but could look like one). In Java, it's very rarely acceptable to ignore an exception. If your call to the take
method throws an InterruptedException
, it's for a reason: another thread wants the current thread to stop. Your get
method should throw InterruptedException
. For example:
public String get() throws InterruptedException {
return unprocessed.take();
}
If you really need the get
method to not throw an InterruptedException
, you could throw some other chained exception containing the InterruptedException
. If it's really appropriate to return ""
on interruption, you could do something like this:
public String get() {
try {
return unprocessed.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "";
}
}
By interrupting the current thread, you are making sure that at least the current thread is marked as interrupted, so something down the line could deal with it. But throwing InterruptedException
is probably most appropriate if possible.
I still don't understand why you're creating your own wrapper for LinkedBlockingQueue
, as opposed to just using a LinkedBlockingQueue
on its own. It seems like everything you're adding on top of LinkedBlockingQueue
is doing nothing but slowing it down.
Upvotes: 2