Reputation: 33
This is the first time I have asked a question on StackOverflow. The problem I have is the following:
I have a Producer and Consumer class. In the Producer class I read a file line by line and put these lines of text in a List of Strings. When the list has an x amount of lines. This list gets added to an ArrayBlockingQueue. I have one Producer Thread that is started within the main thread. Besides this I start a couple of Consumer threads. The Consumer thread takes an item from the queue, which should be a list, and walks through this list of lines looking for a particular word. When the word is found it increments a count variable.
What happends is that when the Consumer takes an item from the queue, it says that it is empty. I cannot figure out why because my producer should certainly add it to the queue.
My code looks like this:
Consumer class:
public static class Consumer implements Callable<Integer> {
int count = 0;
@Override
public Integer call() throws Exception {
List<String> list = new ArrayList<>();
list = arrayBlockingQueueInput.take();
do {
if (!list.isEmpty()){
for (int i = 0; i < arrayBlockingQueueInput.take().size(); i++) {
for (String element : list.get(i).split(" ")) {
if (element.equalsIgnoreCase(findWord)) {
count++;
}
}
}
} else {
arrayBlockingQueueInput.put(list);
}
} while (list.get(0) != "HALT");
return count;
}
}
Producer Class:
public static class Producer implements Runnable {
@Override
public void run() {
try {
FileReader file = new FileReader("src/testText.txt");
BufferedReader br = new BufferedReader(file);
while ((textLine = br.readLine()) != null) {
if (textLine.isEmpty()) {
continue;
}
/* Remove punctuation from the text, except of punctuation that is useful for certain words.
* Examples of these words are don't or re-enter */
textLine = textLine.replaceAll("[[\\W]&&[^']&&[^-]]", " ");
/* Replace all double whitespaces with single whitespaces.
* We will split the text on these whitespaces later */
textLine = textLine.replaceAll("\\s\\s+", " ");
textLine = textLine.replaceAll("\\n", "").replaceAll("\\r", "");
if (results.isEmpty()) {
results.add(textLine);
continue;
}
if (results.size() <= SIZE) {
results.add(textLine);
if (results.size() == SIZE) {
if (arrayBlockingQueueInput.size() == 14){
List<String> list = new ArrayList<String>();
list.add(HALT);
arrayBlockingQueueInput.put(list);
} else{
arrayBlockingQueueInput.put(results);
results.clear();
}
}
}
}
/* Count the remaining words in the list
* (last lines of the file do perhaps not fill up until the given SIZE, therefore need to be counted here)
* Fill the list with empty items if the size of the list does not match with the given SIZE */
while (results.size() != SIZE) {
results.add("");
}
arrayBlockingQueueInput.put(results);
List<String> list = new ArrayList<String>();
list.add(HALT);
arrayBlockingQueueInput.put(list);
results.clear();
} catch (InterruptedException e) {
producerIsRunning = false;
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Main Class:
public void main() throws IOException, InterruptedException {
System.out.println("Enter the word you want to find: ");
Scanner scan = new Scanner(System.in);
findWord = scan.nextLine();
System.out.println("Starting...");
long startTime = System.currentTimeMillis();
Thread producer = new Thread(new Producer());
producer.start();
ExecutorService executorService = Executors.newFixedThreadPool(CORE);
List<Future<Integer>> futureResults = new ArrayList<Future<Integer>>();
for (int i = 0; i < CORE; i++) {
futureResults.add(executorService.submit(new Consumer()));
}
executorService.shutdown();
for (Future<Integer> result : futureResults) {
try {
wordsInText += result.get();
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
producer.join();
long stopTime = System.currentTimeMillis();
System.out.println("The word " + findWord + " appears " + wordsInText + " times in the given text");
System.out.println("Elapsed time was " + (stopTime - startTime) + " milliseconds.");
}
Can anybody explain why this happends? I would like to add that we try to use a poison pill to notify the consumers that the producer is on HALT.
To answer the question why we want to do this like this? For school we try to parallelize a certain programming problem. The problem we chose is string matching. We first made a serial solution and a parallel solution. For the next assignment we have to improve our parallel solution and our teacher told us that this is a way to do it.
Thanks in advance!
Nick
Upvotes: 1
Views: 99
Reputation: 33
After some help from my teacher he helped us find the problem. There were two mistakes. One was within the producer class. I had code to signal a HALT of the producer within the main while loop. This should not have been done.
Besides this, the .take() I do within the Consumer class before the Do-While should have been done within the do-while loop.
The correct code looks like this:
Consumer class:
public static class Consumer implements Callable<Integer> {
int count = 0;
@Override
public Integer call() throws Exception {
List<String> list = new ArrayList<>();
do {
list = arrayBlockingQueueInput.take();
if (!list.get(0).equals(HALT)){
for (int i = 0; i < list.size(); i++) {
for (String element : list.get(i).split(" ")) {
if (element.equalsIgnoreCase(findWord)) {
count++;
}
}
}
} else {
arrayBlockingQueueInput.put(list);
}
} while (!list.get(0).equals(HALT));
return count;
}
}
Producer class:
public static class Producer implements Runnable {
@Override
public void run() {
try {
FileReader file = new FileReader("src/testText.txt");
BufferedReader br = new BufferedReader(file);
while ((textLine = br.readLine()) != null) {
if (textLine.isEmpty()) {
continue;
}
/* Remove punctuation from the text, except of punctuation that is useful for certain words.
* Examples of these words are don't or re-enter */
textLine = textLine.replaceAll("[[\\W]&&[^']&&[^-]]", " ");
/* Replace all double whitespaces with single whitespaces.
* We will split the text on these whitespaces later */
textLine = textLine.replaceAll("\\s\\s+", " ");
textLine = textLine.replaceAll("\\n", "").replaceAll("\\r", "");
if (results.isEmpty()) {
results.add(textLine);
continue;
}
if (results.size() <= SIZE) {
results.add(textLine);
if (results.size() == SIZE) {
arrayBlockingQueueInput.put(new ArrayList<String>(results));
results.clear();
}
}
}
/* Count the remaining words in the list
* (last lines of the file do perhaps not fill up until the given SIZE, therefore need to be counted here)
* Fill the list with empty items if the size of the list does not match with the given SIZE */
while (results.size() != SIZE) {
results.add("");
}
arrayBlockingQueueInput.put(new ArrayList<String>(results));
List<String> list = new ArrayList<String>();
list.add(HALT);
arrayBlockingQueueInput.put(list);
results.clear();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Main class:
public void main() throws IOException, InterruptedException {
System.out.println("Enter the word you want to find: ");
Scanner scan = new Scanner(System.in);
findWord = scan.nextLine();
System.out.println("Starting...");
long startTime = System.currentTimeMillis();
Thread producer = new Thread(new Producer());
producer.start();
ExecutorService executorService = Executors.newFixedThreadPool(CORE);
List<Future<Integer>> futureResults = new ArrayList<Future<Integer>>();
for (int i = 0; i < CORE; i++) {
futureResults.add(executorService.submit(new Consumer()));
}
executorService.shutdown();
for (Future<Integer> result : futureResults) {
try {
wordsInText += result.get();
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
producer.join();
long stopTime = System.currentTimeMillis();
System.out.println("The word " + findWord + " appears " + wordsInText + " times in the given text");
System.out.println("Elapsed time was " + (stopTime - startTime) + " milliseconds.");
}
Thanks to @Ivan for helping me with the .clear method called on results. Without this the code solution did not work.
Upvotes: 0
Reputation: 8758
You add list to a queue and the clear it:
arrayBlockingQueueInput.put(results);
results.clear();
You need to do something like this to add copy of list to a queue so that clear()
will not clean list which is in queue:
arrayBlockingQueueInput.put(new ArrayList<String>(results));
results.clear();
Upvotes: 3