v1shnu
v1shnu

Reputation: 2231

Synchronous access to queue

I have a requirement where I will hit a link and get a response. The response is an XML data containing child links. The response is then copied to a file and the child links are added to a queue where I then iteratively have to hit the child links until there are no further childs.

I first did this using a single queue. But since it was slow, I tried to implement an executor. I do not have to maintain the order of the data. This is my approach now :

 public class Hierarchy2 {

    private static AbstractQueue<String> queue = new ConcurrentLinkedQueue<>();
    private static FileWriter writer;

    private static SAXParser saxParser;
    private static XMLHandler xmlHandler = new XMLHandler();

    public static void main(String[] args) throws IOException, ParserConfigurationException, SAXException {
        writer = new FileWriter(new File("hierarchy.txt"));
        String baseUrl = "my url here";

        queue.add(baseUrl);

        int threadCount = Runtime.getRuntime().availableProcessors() + 1;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executor.execute(new QueueProcess(queue, writer, xmlHandler));
        }

        executor.shutdown();

    }
}

class QueueProcess implements Runnable {

    private AbstractQueue<String> queue;
    private HttpURLConnection connection;
    private URL url;
    private FileWriter writer;
    private SAXParserFactory factory = SAXParserFactory.newInstance();
    private SAXParser saxParser;
    private XMLHandler xmlHandler;

    public QueueProcess(AbstractQueue<String> queue, FileWriter writer, XMLHandler xmlHandler) {
        this.queue = queue;
        this.writer = writer;

        this.xmlHandler = xmlHandler;
    }

    @Override
    public void run() {
        try {
            saxParser = factory.newSAXParser();
            while (true) {
                String link = queue.poll();
                if (link != null) {
                    if (queue.size() >= 500) {
                        System.out.println("here" + "     " + Thread.currentThread().getName());
                        getChildLinks(link);
                    } else {
                        System.out.println(link + "     " + Thread.currentThread().getName());
                        queue.addAll(getChildLinks(link));
                    }
                }
            }
        } catch (IOException | SAXException | ParserConfigurationException e) {
            e.printStackTrace();
        }

    }

    private List<String> getChildLinks(String link) throws IOException, SAXException {
        url = new URL(link);
        connection = (HttpURLConnection) url.openConnection();
        connection.connect();

        String result = new BufferedReader(new InputStreamReader(connection.getInputStream())).lines()
                .collect(Collectors.joining());

        saxParser.parse(new ByteArrayInputStream(result.getBytes()), xmlHandler);
        List<String> urlList = xmlHandler.getURLList();

        writer.write(result + System.lineSeparator());

        connection.disconnect();
        return urlList;
    }

}

The program runs fine but at some point I am getting a null pointer exception . It is on the queue.addAll line in the QueueProcess' run method.

Exception:

Exception in thread "pool-1-thread-3" java.lang.NullPointerException
    at java.util.concurrent.ConcurrentLinkedQueue.checkNotNull(ConcurrentLinkedQueue.java:914)
    at java.util.concurrent.ConcurrentLinkedQueue.addAll(ConcurrentLinkedQueue.java:525)
    at QueueProcess.run(Hierarchy2.java:77)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-1" java.lang.NullPointerException
    at java.util.concurrent.ConcurrentLinkedQueue.checkNotNull(ConcurrentLinkedQueue.java:914)
    at java.util.concurrent.ConcurrentLinkedQueue.addAll(ConcurrentLinkedQueue.java:525)
    at QueueProcess.run(Hierarchy2.java:77)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

I can't figure out why there is a NPE since I check if the queue is not empty before for every iteration of the while loop. Please tell me why do I get a null pointer exception and can prevent this from happening.

Update:

I've finally fixed the NPE. As suggested by @gusto2 , it was due to the arraylist containing an null value which the queue does not accept.

Now my code it like this :

public class Hierarchy2 {

    private static BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    private static FileWriter writer;
    private static XMLHandler xmlHandler = new XMLHandler();

    public static void main(String[] args) throws IOException, ParserConfigurationException, SAXException {
        writer = new FileWriter(new File("hierarchy.txt"));
        String baseUrl = "my url here";

        queue.add(baseUrl);

        int threadCount = Runtime.getRuntime().availableProcessors() + 1;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executor.execute(new QueueProcess(queue, writer, xmlHandler));
        }

        executor.shutdown();

    }
}

class QueueProcess implements Runnable {

    private BlockingQueue<String> queue;
    private HttpURLConnection connection;
    private URL url;
    private FileWriter writer;
    private SAXParserFactory factory = SAXParserFactory.newInstance();
    private SAXParser saxParser;
    private XMLHandler xmlHandler = new XMLHandler();

    public QueueProcess(BlockingQueue<String> queue, FileWriter writer, XMLHandler xmlHandler) {
        this.queue = queue;
        this.writer = writer;
    }

    @Override
    public void run() {
        try {
            saxParser = factory.newSAXParser();
            while (true) {
                String link = queue.poll();
                if (link != null) {
                    System.out.println(link + "     " + Thread.currentThread().getName());
                    queue.addAll(getChildLinks(link));
                }
            }
        } catch (IOException | SAXException | ParserConfigurationException e) {
            e.printStackTrace();
        }

    }

    private List<String> getChildLinks(String link) throws IOException, SAXException {
        url = new URL(link);
        connection = (HttpURLConnection) url.openConnection();
        connection.connect();

        String result = new BufferedReader(new InputStreamReader(connection.getInputStream())).lines()
                .collect(Collectors.joining());

        saxParser.parse(new ByteArrayInputStream(result.getBytes()), xmlHandler);
        List<String> urlList = xmlHandler.getURLList();

        writer.write(result + System.lineSeparator());

        connection.disconnect();
        return urlList;
    }

}

The problem now is to pause the threads when the threads TOGETHER have processed 500 records. Once 500 is reached, I will have to create another file and then start my processing again.

Also please tell me how I can stop the code after all the queues is fully read. ie. no more child links will be added to the queue. Since I am using a always true while loop, the code is going to run indefinitely. If I use the condition while(!queue.isEmpty()), only one of the threads will run as other will find the queue empty.

Upvotes: 0

Views: 212

Answers (2)

Hasib Mahmud
Hasib Mahmud

Reputation: 786

Here you add base url only once in queue. It is not in the loop.

    queue.add(baseUrl);

    int threadCount = Runtime.getRuntime().availableProcessors() + 1;
    ExecutorService executor = Executors.newFixedThreadPool(threadCount);

    for (int i = 0; i < threadCount; i++) {
        executor.execute(new QueueProcess(queue, writer, xmlHandler));
    }

So, when you call QueueProcess(queue, writer, xmlHandler), you pass a single entry. And then when you call String link = queue.poll();, it removes that only one added value. And how can the queue.size() >= 500 be, if you pass a queue with a single value added in QueueProcess(queue, writer, xmlHandler) in for loop?

Upvotes: 0

gusto2
gusto2

Reputation: 12087

Exception in thread "pool-1-thread-1" java.lang.NullPointerException 
at java.util.concurrent.ConcurrentLinkedQueue.checkNotNull(ConcurrentLinkedQueue.java:914) 
at  java.util.concurrent.ConcurrentLinkedQueue.addAll(ConcurrentLinkedQueue.java:525)

I'd guess that List<String> urlList = xmlHandler.getURLList(); returns an ArrayList with some null values inside. Though without much more information it is hard to tell more precise

Upvotes: 1

Related Questions