Reputation: 741
Status: adding static to the method, variable queue and making synchronized(crawler.class) solved the problem. thanks all!!
http://pastie.org/8724549#41-42,46,49,100-101,188-189,191
the highlighted method / block is synchronized
.
That block/method should be accessed by one method at a particular time.
.
It should be like this = first thread goes in the method, updates size, all others see that size. updated one. the update should have been only made by the first thread. not others
queue loaded, new size ------------
" its creating/ adding elementspackage crawler;
import crawler.Main;
import static crawler.Main.basicDAO;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
/**
*
* @author syncsys
*/
public class Crawler implements Runnable, InterfaceForConstants {
public static final String patternString = "[_A-Za-z0-9-]+(\\.[_A-Za-z0-9-]+)*@[A-Za-z0-9]+(\\.[A-Za-z0-9]+)*(\\.[A-Za-z]{2,})";
public ConcurrentLinkedQueue<Link> queue = new ConcurrentLinkedQueue<Link>();
private volatile String url;
private void crawl(String url) {
synchronized (Crawler.class){
System.out.println("queue size "+queue.size());
if(queue.size() < (totalSizeOfEmailBucket / 3)){
updateQueue();
}
System.out.println("This is inside of sync block. ----------- queue size "+queue.size());
}
System.out.println("This is at the end of sync block. ----------- queue size "+queue.size());
BufferedReader bf = null;
try {
url = queue.poll().getLink();
URL target = new URL(url);
bf = new BufferedReader(
new InputStreamReader(target.openStream())
);
StringBuilder html = new StringBuilder();
String inputLine;
while ((inputLine = bf.readLine()) != null) {
html.append(inputLine);
}
List emailList = new ArrayList( getEmailList(html.toString()) );
// List linkList = new ArrayList( getLinkList(html.toString(), url) );
System.out.println("Just worked on --------- "+ url);
Main.processedLinksCount++;
if(emailList.size()>0){
putEmailsInDB(emailList);
}
// putLinksInDB(linkList);
} catch (IOException ex) {
Logging.logError(ex.toString());
basicDAO.deleteLink(url);
} catch (Exception ex) {
Logging.logError(ex.toString());
basicDAO.deleteLink(url);
}finally{
if(bf !=null){
try {
bf.close();
} catch (IOException ex) {
Logging.logError(ex.toString());
}
}
crawl(null);
}
}
public synchronized void updateQueue() {
Queue<Link> tempQueue = new PriorityQueue<Link>();
tempQueue = getNonProcessedLinkFromDB() ;
queue.addAll(tempQueue);
BasicDAO.markLinkAsProcesed(tempQueue);
System.out.println("queue loaded, new size ------------------------------------ "+queue.size());
}
private List getLinkList(String html, String url) {
Document doc = Jsoup.parse(html);
Elements bodies = doc.select("body");
List linkList = new ArrayList();
for(Element body : bodies ){
Elements aTags = body.getElementsByTag("a");
for (Element a: aTags){
String link = a.attr("href");
if ( !(link.startsWith("#"))
&&
!(link.contains("()"))
&&
!(link.endsWith(".jpg"))
&&
!(link.endsWith(".jpeg"))
&&
!(link.endsWith(".png"))
&&
!(link.endsWith(".gif")) ){
if( link.startsWith("/") ){
link = url+link;
}
linkList.add(link);
//put link in db
}
}
}
return linkList;
}
private List getEmailList(String html) {
Pattern p = Pattern.compile(patternString);
Matcher m = p.matcher(html);
List emailList = new ArrayList();
while(m.find()){
emailList.add(m.group());
Main.nonUniqueEmailsCount++;
}
return emailList;
}
private Queue<Link> getNonProcessedLinkFromDB() {
return ( basicDAO.getNonProcessedLink() );
}
private void putEmailsInDB(List emailList) {
basicDAO.insertEmail(emailList);
}
private void putLinksInDB(List linkList) {
basicDAO.insertLinks(linkList);
}
@Override
public void run() {
if(url != null){
crawl(url);
}else{
// crawl();
}
}
public Crawler(String url){
this.url = url;
}
public Crawler(){
this.url = null;
}
}
way of starting threads: non optimistic. I know. no executor service or pool used but the following is valid code:
for (int i = 0; i < 11; i++) {
try {
Thread thread = new Thread(new Crawler("https://www.google.com.pk/?gws_rd=cr&ei=-q8vUqqNDIny4QTLlYCwAQ#q=pakistan"/*new BasicDAO().getNonProcessedLink()*/));
System.out.println("resume with saved link true");
thread.start();
System.out.println("thread stared");
threadList.add(thread);
System.out.println("thread added to arraylist");
} catch (Exception ex) {
new Logging().logError(ex.toString());
}
}
debugs:
for 11 threads , its says in logs:
queue loaded, new size ------------------------------------ 1000
This is inside of sync block. ----------- queue size 1000
This is at the end of sync block. ----------- queue size 1000
queue size 0
queue loaded, new size ------------------------------------ 1000
This is inside of sync block. ----------- queue size 1000
queue size 0
This is at the end of sync block. ----------- queue size 1000
queue loaded, new size ------------------------------------ 1000
This is inside of sync block. ----------- queue size 1000
This is at the end of sync block. ----------- queue size 1000
queue size 0
queue loaded, new size ------------------------------------ 1000
This is inside of sync block. ----------- queue size 1000
This is at the end of sync block. ----------- queue size 1000
queue size 0
queue loaded, new size ------------------------------------ 1000
This is inside of sync block. ----------- queue size 1000
This is at the end of sync block. ----------- queue size 1000
queue size 0
queue loaded, new size ------------------------------------ 1000
This is inside of sync block. ----------- queue size 1000
This is at the end of sync block. ----------- queue size 1000
queue size 0
queue loaded, new size ------------------------------------ 1000
This is inside of sync block. ----------- queue size 1000
This is at the end of sync block. ----------- queue size 1000
queue size 0
queue loaded, new size ------------------------------------ 1000
This is inside of sync block. ----------- queue size 1000
This is at the end of sync block. ----------- queue size 1000
queue size 0
queue loaded, new size ------------------------------------ 1000
This is inside of sync block. ----------- queue size 1000
This is at the end of sync block. ----------- queue size 1000
queue size 0
queue loaded, new size ------------------------------------ 1000
This is inside of sync block. ----------- queue size 1000
This is at the end of sync block. ----------- queue size 1000
queue size 0
queue loaded, new size ------------------------------------ 1000
This is inside of sync block. ----------- queue size 1000
This is at the end of sync block. ----------- queue size 1000
Just worked on --------- http://ao.com/Advice/Washing-Machines/Top-Tens/Top-Five-Washing-Machines/Advice/Freezers/Top-Tens/Top-Five-Freezers/flavel
queue size 999
Just worked on --------- http://ao.com/Advice/Washing-Machines/Top-Tens/Top-Five-Washing-Machines/l/fridges-width_less_than_50_cm/1-26/29-30//zanussi
queue loaded, new size ------------------------------------ 1999
This is inside of sync block. ----------- queue size 1999
This is at the end of sync block. ----------- queue size 1999
queue size 999
queue loaded, new size ------------------------------------ 1999
This is inside of sync block. ----------- queue size 1999
This is at the end of sync block. ----------- queue size 1999
Just worked on --------- http://ao.com/Advice/Washing-Machines/Top-Tens/Top-Five-Washing-Machines/Advice/Refrigerators/Top-Tens/Top-Five-Fridges/l/small_appliances-bodum/1-6/55/
queue size 999
queue loaded, new size ------------------------------------ 1999
This is inside of sync block. ----------- queue size 1999
This is at the end of sync block. ----------- queue size 1999
8692 characters / 254 lines
Advertising from Carbon:
Advertisement Braintree: 2.9% and 30¢ per transaction. No minimums, no monthly fees.
Upvotes: 3
Views: 349
Reputation: 279960
Assuming this
first thread goes in the method, updates size, all others see that size.
is your expectation, here's the root cause.
Your queue
field is an instance field
public ConcurrentLinkedQueue<Link> queue = new ConcurrentLinkedQueue<Link>();
and you have 11 instances
Thread thread = new Thread(new Crawler("https://www.google.com.pk/?gws_rd=cr&ei=-q8vUqqNDIny4QTLlYCwAQ#q=pakistan"/*new BasicDAO().getNonProcessedLink()*/));
so each Crawler
has its own queue and therefore each thread enters this block
if(queue.size() < (totalSizeOfEmailBucket / 3)){
updateQueue();
}
because each thread is updating a different ConcurrentLinkedQueue
object.
it is runing without waiting for the prior thread to finish.
This is not true. All the threads will block at
synchronized (Crawler.class){
but any code outside of that is fair game.
Upvotes: 1
Reputation: 949
The code will not behave as expected because of this line: synchronized (Crawler.class) You have taken a lock on 'Class' object of Crawler which would be one per classloader, BUT your synchronized methods are non-static. Replace non-static synchronized methods with static synchronized methods and that should solve the problem.
Upvotes: 2