Reputation: 2231
I have a requirement where I will hit a link and get a response. The response is an XML data containing child links. The response is then copied to a file and the child links are added to a queue where I then iteratively have to hit the child links until there are no further childs.
I first did this using a single queue. But since it was slow, I tried to implement an executor. I do not have to maintain the order of the data. This is my approach now :
public class Hierarchy2 {
private static AbstractQueue<String> queue = new ConcurrentLinkedQueue<>();
private static FileWriter writer;
private static SAXParser saxParser;
private static XMLHandler xmlHandler = new XMLHandler();
public static void main(String[] args) throws IOException, ParserConfigurationException, SAXException {
writer = new FileWriter(new File("hierarchy.txt"));
String baseUrl = "my url here";
queue.add(baseUrl);
int threadCount = Runtime.getRuntime().availableProcessors() + 1;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
executor.execute(new QueueProcess(queue, writer, xmlHandler));
}
executor.shutdown();
}
}
class QueueProcess implements Runnable {
private AbstractQueue<String> queue;
private HttpURLConnection connection;
private URL url;
private FileWriter writer;
private SAXParserFactory factory = SAXParserFactory.newInstance();
private SAXParser saxParser;
private XMLHandler xmlHandler;
public QueueProcess(AbstractQueue<String> queue, FileWriter writer, XMLHandler xmlHandler) {
this.queue = queue;
this.writer = writer;
this.xmlHandler = xmlHandler;
}
@Override
public void run() {
try {
saxParser = factory.newSAXParser();
while (true) {
String link = queue.poll();
if (link != null) {
if (queue.size() >= 500) {
System.out.println("here" + " " + Thread.currentThread().getName());
getChildLinks(link);
} else {
System.out.println(link + " " + Thread.currentThread().getName());
queue.addAll(getChildLinks(link));
}
}
}
} catch (IOException | SAXException | ParserConfigurationException e) {
e.printStackTrace();
}
}
private List<String> getChildLinks(String link) throws IOException, SAXException {
url = new URL(link);
connection = (HttpURLConnection) url.openConnection();
connection.connect();
String result = new BufferedReader(new InputStreamReader(connection.getInputStream())).lines()
.collect(Collectors.joining());
saxParser.parse(new ByteArrayInputStream(result.getBytes()), xmlHandler);
List<String> urlList = xmlHandler.getURLList();
writer.write(result + System.lineSeparator());
connection.disconnect();
return urlList;
}
}
The program runs fine but at some point I am getting a null pointer exception . It is on the queue.addAll
line in the QueueProcess'
run
method.
Exception:
Exception in thread "pool-1-thread-3" java.lang.NullPointerException
at java.util.concurrent.ConcurrentLinkedQueue.checkNotNull(ConcurrentLinkedQueue.java:914)
at java.util.concurrent.ConcurrentLinkedQueue.addAll(ConcurrentLinkedQueue.java:525)
at QueueProcess.run(Hierarchy2.java:77)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-1" java.lang.NullPointerException
at java.util.concurrent.ConcurrentLinkedQueue.checkNotNull(ConcurrentLinkedQueue.java:914)
at java.util.concurrent.ConcurrentLinkedQueue.addAll(ConcurrentLinkedQueue.java:525)
at QueueProcess.run(Hierarchy2.java:77)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I can't figure out why there is a NPE since I check if the queue is not empty before for every iteration of the while loop. Please tell me why do I get a null pointer exception and can prevent this from happening.
Update:
I've finally fixed the NPE. As suggested by @gusto2 , it was due to the arraylist containing an null value which the queue does not accept.
Now my code it like this :
public class Hierarchy2 {
private static BlockingQueue<String> queue = new LinkedBlockingQueue<>();
private static FileWriter writer;
private static XMLHandler xmlHandler = new XMLHandler();
public static void main(String[] args) throws IOException, ParserConfigurationException, SAXException {
writer = new FileWriter(new File("hierarchy.txt"));
String baseUrl = "my url here";
queue.add(baseUrl);
int threadCount = Runtime.getRuntime().availableProcessors() + 1;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
executor.execute(new QueueProcess(queue, writer, xmlHandler));
}
executor.shutdown();
}
}
class QueueProcess implements Runnable {
private BlockingQueue<String> queue;
private HttpURLConnection connection;
private URL url;
private FileWriter writer;
private SAXParserFactory factory = SAXParserFactory.newInstance();
private SAXParser saxParser;
private XMLHandler xmlHandler = new XMLHandler();
public QueueProcess(BlockingQueue<String> queue, FileWriter writer, XMLHandler xmlHandler) {
this.queue = queue;
this.writer = writer;
}
@Override
public void run() {
try {
saxParser = factory.newSAXParser();
while (true) {
String link = queue.poll();
if (link != null) {
System.out.println(link + " " + Thread.currentThread().getName());
queue.addAll(getChildLinks(link));
}
}
} catch (IOException | SAXException | ParserConfigurationException e) {
e.printStackTrace();
}
}
private List<String> getChildLinks(String link) throws IOException, SAXException {
url = new URL(link);
connection = (HttpURLConnection) url.openConnection();
connection.connect();
String result = new BufferedReader(new InputStreamReader(connection.getInputStream())).lines()
.collect(Collectors.joining());
saxParser.parse(new ByteArrayInputStream(result.getBytes()), xmlHandler);
List<String> urlList = xmlHandler.getURLList();
writer.write(result + System.lineSeparator());
connection.disconnect();
return urlList;
}
}
The problem now is to pause the threads when the threads TOGETHER have processed 500 records. Once 500 is reached, I will have to create another file and then start my processing again.
Also please tell me how I can stop the code after all the queues is fully read. ie. no more child links will be added to the queue. Since I am using a always true while loop, the code is going to run indefinitely. If I use the condition while(!queue.isEmpty())
, only one of the threads will run as other will find the queue empty.
Upvotes: 0
Views: 212
Reputation: 786
Here you add base url only once in queue. It is not in the loop.
queue.add(baseUrl);
int threadCount = Runtime.getRuntime().availableProcessors() + 1;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
executor.execute(new QueueProcess(queue, writer, xmlHandler));
}
So, when you call QueueProcess(queue, writer, xmlHandler)
, you pass a single entry. And then when you call String link = queue.poll();
, it removes that only one added value. And how can the queue.size() >= 500
be, if you pass a queue with a single value added in QueueProcess(queue, writer, xmlHandler)
in for loop?
Upvotes: 0
Reputation: 12087
Exception in thread "pool-1-thread-1" java.lang.NullPointerException
at java.util.concurrent.ConcurrentLinkedQueue.checkNotNull(ConcurrentLinkedQueue.java:914)
at java.util.concurrent.ConcurrentLinkedQueue.addAll(ConcurrentLinkedQueue.java:525)
I'd guess that List<String> urlList = xmlHandler.getURLList();
returns an ArrayList with some null values inside. Though without much more information it is hard to tell more precise
Upvotes: 1