nick.katsip
nick.katsip

Reputation: 868

Unexplained halt of execution

I am working on a Java process that contains 2 threads: one for reading a file's contents and adding them in one shared blocking queue; and one for retrieving the lines from the blocking queue and sending them through the network (under a specified send rate). The two classes I have are the following:

Updated Code below

Producer Thread:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;

public class SourceFileProducer implements Runnable {

    private File file;

    private BufferedReader reader;

    private ArrayBlockingQueue<String> buffer;

    private String fileName;

    private String endMarker;

    public SourceFileProducer(ArrayBlockingQueue<String> buffer, 
            String endMarker, String fileName) {
        this.buffer = buffer;
        this.endMarker = endMarker;
        file = new File(fileName);
        if(file.exists()) {
            try {
                reader = new BufferedReader(new FileReader(file));
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
        }
        this.fileName = fileName;
    }

    @Override
    public void run() {
        System.out.println("SourceFileProducer thread-" + Thread.currentThread().getId() + " initiating with source file: " + fileName);
        String line = "";
        try {
            while((line = reader.readLine()) != null) {
                try {
                    buffer.put(line);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                buffer.put(endMarker);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("SourceFileProducer thread-" + Thread.currentThread().getId() + " scanned and buffered the whole file.");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

and the Consumer thread:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;

public class SourceFileConsumer implements Runnable {

    private ArrayBlockingQueue<String> buffer;

    private BufferedReader socketInput;

    private PrintWriter socketOutput;

    private Socket client;

    private ServerSocket serverSocket;

    private long checkpoint[] = null;

    private int rate[] = null;

    private String endMarker;

    public SourceFileConsumer(ArrayBlockingQueue<String> buffer, String endMarker, 
            ServerSocket serverSocket, Socket client, long checkpoint[], int rate[]) {
        this.buffer = buffer;
        this.endMarker = endMarker;
        this.client = client;
        try {
            socketOutput = new PrintWriter(client.getOutputStream(), true);
            socketInput = new BufferedReader(new InputStreamReader(client.getInputStream()));
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.checkpoint = new long[checkpoint.length];
        this.rate = new int[rate.length];
        for(int i = 0; i < checkpoint.length; i++) {
            this.checkpoint[i] = checkpoint[i];
            this.rate[i] = rate[i];
        }
        this.serverSocket = serverSocket;
    }

    @Override
    public void run() {
        String line = null;
        long start = System.currentTimeMillis();
        int index = 0;
        boolean fileScanFlag = true;
        while(fileScanFlag) {
            long startTimestamp = System.currentTimeMillis();
            long interval = (startTimestamp - start) / 1000L;
            if(interval >= checkpoint[index]) {
                if(index < checkpoint.length - 1) {
                    if(interval >= checkpoint[index + 1]) {
                        index += 1;
                        System.out.println("SourceFileConsumer thread-" + Thread.currentThread().getId() + 
                                " progressed to checkpoint " + index + " with rate: " + rate[index]);
                    }
                }
            }
            int counter = 0;
            while(counter < rate[index]) {
                try {
                    line = buffer.take();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                if(line == endMarker) {
                    fileScanFlag = false;
                    break;
                }
                if(socketOutput != null && socketOutput.checkError()) {
                    System.out.println("SourceFileConsumer Thread-" + Thread.currentThread().getId() + " detected broken link...");
                    try {
                        client = serverSocket.accept();
                        socketOutput = new PrintWriter(client.getOutputStream(), true);
                        socketInput = new BufferedReader(new InputStreamReader(client.getInputStream()));
                    } catch(IOException e) {
                        e.printStackTrace();
                    }
                    System.out.println("SourceFileConsumer Thread-" + Thread.currentThread().getId() + " re-established connection...");
                }
                if(socketOutput != null)
                    socketOutput.println(line);
                counter += 1;
            }
            long endTimestamp = System.currentTimeMillis();
            if(endTimestamp - startTimestamp <= 1000) {
                System.out.println("thread-" + Thread.currentThread().getId() + " input rate: " + counter + 
                        ", wait time: " + (1000 - (endTimestamp - startTimestamp)));
                try {
                    Thread.sleep((1000 - (endTimestamp - startTimestamp)));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        if(socketInput != null && socketOutput != null && client != null) {
            try {
                socketInput.close();
                socketOutput.close();
                client.close();
            } catch(IOException e) {
                e.printStackTrace();
            }
        }
        System.out.println("SourceFileConsumer Thread-" + Thread.currentThread().getId() + " transfer complete.");
    }   
}

The problem is that, after a while, both threads hang and no tuples are sent. When I run a top command in my Linux machine, I see that the Java process, in which the two threads are running in, uses a really small amount of CPU time. Why is this happening? Is this a problem with starvation? I think that starvation can be avoided by using the LinkedBlockingQueue.

Any hints?

Thanks, Nick

Upvotes: 0

Views: 68

Answers (1)

Holger
Holger

Reputation: 298599

That’s quite a lot of code, especially within your consumer. So it’s not possible to preclude that there are multiple errors. I recommend to simplify your code to narrow the problem, e.g. test your producer-consumer hand-off and the network operations independently.

One obvious problem is that you are trying to signal the end of a file via an AtomicBoolean but your consumer isn’t actually testing it before taking items. If you look at the place where it takes items, there is an inner loop:

while(counter < rate[index]) {
            try {
                line = buffer.take();
…

Since the producer has no influence on the counter < rate[index] condition, it has no control over how many lines the consumer will attempt to take before checking the state of the fileScanFlag.

But even if you try to fix this by checking the boolean flag right before take, the result is broken due to possible race conditions. The atomic boolean and the blocking queue are both thread-safe on their own but your combination of the two is not.

Putting the last item on the queue and setting the flag are two distinct operations. Right in-between these two actions, the consumer can take the last item, recheck the flag and find it being false and go to the next attempt to take while the producer is about to set it to true.

One solution is to change the order of the operations on the consumer side, which requires resorting to polling:

polling: for(;;) {
    line = buffer.poll(timeout, timeOutUnit); // control the cpu consumption via timeout
    if(line!=null) break polling;
    if(fileScanFlag.get()) break outerLoop;
}

An alternative is not to use two different communication constructs. Instead of maintaining a boolean flag, place an end marker object to the queue once the file reached an end. This is one of the rare cases, where using the identity of a String rather than equals is appropriate:

public class SourceFileProducer implements Runnable {
    private String endMarker;
    …
    public SourceFileProducer(LinkedBlockingQueue<String> buffer, 
            String endMarker, String fileName) {
        this.buffer = buffer;
        this.endMarker = endMarker;
    …

    @Override
    public void run() {
        System.out.println("SourceFileProducer thread-" + Thread.currentThread().getId()
            + " initiating with source file: " + fileName);
        String line;
        try {
            while((line = reader.readLine()) != null) buffer.put(line);
        } catch (IOException|InterruptedException e) {
            e.printStackTrace();
        }
        buffer.put(endMarker);
    }

 

public class SourceFileConsumer implements Runnable {
    private String endMarker;
    …

    public SourceFileConsumer(LinkedBlockingQueue<String> buffer, String endMarker, 
            ServerSocket serverSocket, Socket client, long checkpoint[], int rate[]) {
        this.buffer = buffer;
        this.endMarker = endMarker;
…

                    line = buffer.take();
                    if(line==endMarker) break;

The value of the end marker doesn’t matter but it’s object identity. Hence, the code which creates the two threads must contain something like:

 // using new to ensure unique identity
private static final String EOF = new String("end of file");

…
new SourceFileProducer(queue, EOF, …)
new SourceFileConsumer(queue, EOF, …)

The new operator guarantees to produce an object with a unique identity, therefore, comparing that marker object with any other String, i.e. the lines returned by BufferedReader, via == will always evaluate to false. Care must be taken not to let the marker object escape to code not knowing about its special role.

Upvotes: 1

Related Questions