Masood Ahmad
Masood Ahmad

Reputation: 741

synchronized block and method not working as intended

Status: adding static to the method, variable queue and making synchronized(crawler.class) solved the problem. thanks all!!

http://pastie.org/8724549#41-42,46,49,100-101,188-189,191

the highlighted method / block is synchronized.

That block/method should be accessed by one method at a particular time..

It should be like this = first thread goes in the method, updates size, all others see that size. updated one. the update should have been only made by the first thread. not others

  1. why its even being run . its being run by all 11 threads.
  2. it is runing without waiting for the prior thread to finish. "queue loaded, new size ------------" its creating/ adding elements

package crawler;

import crawler.Main;
import static crawler.Main.basicDAO;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;


/**
 *
 * @author syncsys
 */
public class Crawler implements Runnable, InterfaceForConstants {
public static final String patternString = "[_A-Za-z0-9-]+(\\.[_A-Za-z0-9-]+)*@[A-Za-z0-9]+(\\.[A-Za-z0-9]+)*(\\.[A-Za-z]{2,})";
public ConcurrentLinkedQueue<Link> queue =  new ConcurrentLinkedQueue<Link>();
private volatile String url;





    private void crawl(String url) {


        synchronized (Crawler.class){
            System.out.println("queue size "+queue.size());
            if(queue.size() < (totalSizeOfEmailBucket / 3)){
                updateQueue();
            }
        System.out.println("This is inside of sync block.   -----------    queue size "+queue.size());
        }
        System.out.println("This is at the end of sync block.   -----------    queue size "+queue.size());





        BufferedReader bf = null;
        try {
            url = queue.poll().getLink();
            URL target = new URL(url);
            bf = new BufferedReader(
                    new InputStreamReader(target.openStream())
                 );
            StringBuilder html = new StringBuilder();
            String inputLine;
            while ((inputLine = bf.readLine()) != null) {
                html.append(inputLine);
            }
            List emailList = new ArrayList( getEmailList(html.toString()) );
           // List linkList = new ArrayList( getLinkList(html.toString(), url) );
            System.out.println("Just worked on --------- "+ url);
            Main.processedLinksCount++;
            if(emailList.size()>0){
                putEmailsInDB(emailList);
            }

            // putLinksInDB(linkList);
        } catch (IOException ex) {
            Logging.logError(ex.toString());
            basicDAO.deleteLink(url);
        } catch (Exception ex) {
            Logging.logError(ex.toString()); 
            basicDAO.deleteLink(url);
        }finally{
            if(bf !=null){
                try {
                bf.close();
                } catch (IOException ex) {
                    Logging.logError(ex.toString());
                }

            }
            crawl(null);
        }
    }

    public synchronized void  updateQueue() {



            Queue<Link> tempQueue =  new PriorityQueue<Link>();
            tempQueue = getNonProcessedLinkFromDB() ;

            queue.addAll(tempQueue);
            BasicDAO.markLinkAsProcesed(tempQueue);
            System.out.println("queue loaded, new  size ------------------------------------ "+queue.size());

    }

    private  List getLinkList(String html, String url) {
        Document doc = Jsoup.parse(html);
        Elements bodies = doc.select("body");
        List linkList =  new ArrayList();
        for(Element body : bodies ){
            Elements aTags = body.getElementsByTag("a");
            for (Element a: aTags){
               String link =  a.attr("href");
               if ( !(link.startsWith("#")) 
                     && 
                    !(link.contains("()"))
                     && 
                    !(link.endsWith(".jpg")) 
                     && 
                    !(link.endsWith(".jpeg"))  
                     && 
                    !(link.endsWith(".png"))  
                     && 
                    !(link.endsWith(".gif"))     ){

                    if( link.startsWith("/") ){
                        link = url+link;
                    }
                 linkList.add(link);
                 //put link in db
               }    
            }
        }

        return linkList;
    }

    private  List getEmailList(String html) {
        Pattern p = Pattern.compile(patternString);
        Matcher m = p.matcher(html);
        List emailList = new ArrayList();
        while(m.find()){
            emailList.add(m.group());
            Main.nonUniqueEmailsCount++;
        }

        return emailList;    
    }



    private Queue<Link> getNonProcessedLinkFromDB() {
        return ( basicDAO.getNonProcessedLink() );
    }

    private  void putEmailsInDB(List emailList) {
        basicDAO.insertEmail(emailList);
    }

    private  void putLinksInDB(List linkList) {
       basicDAO.insertLinks(linkList);
    }

    @Override
    public void run() {
        if(url != null){
            crawl(url);
        }else{
 //          crawl();
        }

    }
    public Crawler(String url){
        this.url = url;
    }

    public Crawler(){
        this.url =  null;
    }
}

way of starting threads: non optimistic. I know. no executor service or pool used but the following is valid code:

for (int i = 0; i < 11; i++) {


                    try {



                             Thread thread = new Thread(new Crawler("https://www.google.com.pk/?gws_rd=cr&ei=-q8vUqqNDIny4QTLlYCwAQ#q=pakistan"/*new BasicDAO().getNonProcessedLink()*/)); 
                             System.out.println("resume with saved link true");


                        thread.start();

                        System.out.println("thread stared");
                        threadList.add(thread);
                        System.out.println("thread added to arraylist");

                    } catch (Exception ex) {
                        new Logging().logError(ex.toString());
                    }

               }

debugs:

for 11 threads , its says in logs:

queue loaded, new  size ------------------------------------ 1000
This is inside of sync block.   -----------    queue size 1000
This is at the end of sync block.   -----------    queue size 1000
queue size 0
queue loaded, new  size ------------------------------------ 1000
This is inside of sync block.   -----------    queue size 1000
queue size 0
This is at the end of sync block.   -----------    queue size 1000
queue loaded, new  size ------------------------------------ 1000
This is inside of sync block.   -----------    queue size 1000
This is at the end of sync block.   -----------    queue size 1000
queue size 0
queue loaded, new  size ------------------------------------ 1000
This is inside of sync block.   -----------    queue size 1000
This is at the end of sync block.   -----------    queue size 1000
queue size 0
queue loaded, new  size ------------------------------------ 1000
This is inside of sync block.   -----------    queue size 1000
This is at the end of sync block.   -----------    queue size 1000
queue size 0
queue loaded, new  size ------------------------------------ 1000
This is inside of sync block.   -----------    queue size 1000
This is at the end of sync block.   -----------    queue size 1000
queue size 0
queue loaded, new  size ------------------------------------ 1000
This is inside of sync block.   -----------    queue size 1000
This is at the end of sync block.   -----------    queue size 1000
queue size 0
queue loaded, new  size ------------------------------------ 1000
This is inside of sync block.   -----------    queue size 1000
This is at the end of sync block.   -----------    queue size 1000
queue size 0
queue loaded, new  size ------------------------------------ 1000
This is inside of sync block.   -----------    queue size 1000
This is at the end of sync block.   -----------    queue size 1000
queue size 0
queue loaded, new  size ------------------------------------ 1000
This is inside of sync block.   -----------    queue size 1000
This is at the end of sync block.   -----------    queue size 1000
queue size 0
queue loaded, new  size ------------------------------------ 1000
This is inside of sync block.   -----------    queue size 1000
This is at the end of sync block.   -----------    queue size 1000
Just worked on --------- http://ao.com/Advice/Washing-Machines/Top-Tens/Top-Five-Washing-Machines/Advice/Freezers/Top-Tens/Top-Five-Freezers/flavel
queue size 999
Just worked on --------- http://ao.com/Advice/Washing-Machines/Top-Tens/Top-Five-Washing-Machines/l/fridges-width_less_than_50_cm/1-26/29-30//zanussi
queue loaded, new  size ------------------------------------ 1999
This is inside of sync block.   -----------    queue size 1999
This is at the end of sync block.   -----------    queue size 1999
queue size 999
queue loaded, new  size ------------------------------------ 1999
This is inside of sync block.   -----------    queue size 1999
This is at the end of sync block.   -----------    queue size 1999
Just worked on --------- http://ao.com/Advice/Washing-Machines/Top-Tens/Top-Five-Washing-Machines/Advice/Refrigerators/Top-Tens/Top-Five-Fridges/l/small_appliances-bodum/1-6/55/
queue size 999
queue loaded, new  size ------------------------------------ 1999
This is inside of sync block.   -----------    queue size 1999
This is at the end of sync block.   -----------    queue size 1999
8692 characters / 254 lines
Advertising from Carbon:
Advertisement Braintree: 2.9% and 30¢ per transaction. No minimums, no monthly fees.

Upvotes: 3

Views: 349

Answers (2)

Sotirios Delimanolis
Sotirios Delimanolis

Reputation: 279960

Assuming this

first thread goes in the method, updates size, all others see that size.

is your expectation, here's the root cause.

Your queue field is an instance field

public ConcurrentLinkedQueue<Link> queue =  new ConcurrentLinkedQueue<Link>();

and you have 11 instances

Thread thread = new Thread(new Crawler("https://www.google.com.pk/?gws_rd=cr&ei=-q8vUqqNDIny4QTLlYCwAQ#q=pakistan"/*new BasicDAO().getNonProcessedLink()*/)); 

so each Crawler has its own queue and therefore each thread enters this block

if(queue.size() < (totalSizeOfEmailBucket / 3)){
    updateQueue();
}

because each thread is updating a different ConcurrentLinkedQueue object.

it is runing without waiting for the prior thread to finish.

This is not true. All the threads will block at

synchronized (Crawler.class){

but any code outside of that is fair game.

Upvotes: 1

aryann
aryann

Reputation: 949

The code will not behave as expected because of this line: synchronized (Crawler.class) You have taken a lock on 'Class' object of Crawler which would be one per classloader, BUT your synchronized methods are non-static. Replace non-static synchronized methods with static synchronized methods and that should solve the problem.

Upvotes: 2

Related Questions