How to make concurrent GET requests from url pool

I completed the suggested go-tour, watched some tutorials and gopher-conferences on YouTube. And that's pretty much it.

I have a project which requires me to send get requests and store the results in files. But amount of URL's is around 80 million.

I'm testing with 1000 URLs only.

Problem: I think I couldn't managed to make it concurrent, although I've followed some guidelines. I don't know what's wrong. But maybe I'm wrong and it's concurrent, just did not seem fast to me, the speed felt like sequential requests.

Here is the code I've written:

package main

import (
    "bufio"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "sync"
    "time"
)

var wg sync.WaitGroup // synchronization to wait for all the goroutines

func crawler(urlChannel <-chan string) {
    defer wg.Done()
    client := &http.Client{Timeout: 10 * time.Second} // single client is sufficient for multiple requests

    for urlItem := range urlChannel {
        req1, _ := http.NewRequest("GET", "http://"+urlItem, nil)                                           // generating the request
        req1.Header.Add("User-agent", "Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/74.0") // changing user-agent
        resp1, respErr1 := client.Do(req1)                                                                  // sending the prepared request and getting the response
        if respErr1 != nil {
            continue
        }

        defer resp1.Body.Close()

        if resp1.StatusCode/100 == 2 { // means server responded with 2xx code
            text1, readErr1 := ioutil.ReadAll(resp1.Body) // try to read the sourcecode of the website
            if readErr1 != nil {
                log.Fatal(readErr1)
            }

            f1, fileErr1 := os.Create("200/" + urlItem + ".txt") // creating the relative file
            if fileErr1 != nil {
                log.Fatal(fileErr1)
            }
            defer f1.Close()

            _, writeErr1 := f1.Write(text1) // writing the sourcecode into our file
            if writeErr1 != nil {
                log.Fatal(writeErr1)
            }
        }
    }
}

func main() {
    file, err := os.Open("urls.txt") // the file containing the url's
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close() // don't forget to close the file

    urlChannel := make(chan string, 1000) // create a channel to store all the url's

    scanner := bufio.NewScanner(file) // each line has another url
    for scanner.Scan() {
        urlChannel <- scanner.Text()
    }
    close(urlChannel)

    _ = os.Mkdir("200", 0755) // if it's there, it will create an error, and we will simply ignore it
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go crawler(urlChannel)
    }
    wg.Wait()
}

My question is: why is this code not working concurrently? How can I solve the problem I've mentioned above. Is there something that I'm doing wrong for making concurrent GET requests?

Upvotes: 2

Views: 988

Answers (2)

erik258
erik258

Reputation: 16305

Here's some code to get you thinking. I put the URLs in the code so it is self-sufficient, but you'd probably be piping them to stdin in practice. There's a few things I'm doing here that I think are improvements, or at least worth thinking about.

Before we get started, I'll point out that I put the complete url in the input stream. For one thing, this lets me support http and https both. I don't really see the logic behind hard coding the scheme in the code rather than leaving it in the data.

First, it can handle arbitrarily sized response bodies (your version reads the body into memory, so it is limited by some number of concurrent large requests filling memory). I do this with io.Copy().

[edited]

text1, readErr1 := ioutil.ReadAll(resp1.Body) reads the entire http body. If the body is large, it will take up lots of memory. io.Copy(f1,resp1.Body) would instead copy the data from the http response body directly to the file, without having to hold the whole thing in memory. It may be done in one Read/Write or many.

http.Response.Body is an io.ReadCloser because the HTTP protocol expects the body to be read progressively. http.Response does not yet have the entire body, until it is read. That's why it's not just a []byte. Writing it to the filesystem progressively while the data "streams" in from the tcp socket means that a finite amount of system resources can download an unlimited amount of data.

But there's even more benefit. io.Copy will call ReadFrom() on the file. If you look at the linux implementation (for example): https://golang.org/src/os/readfrom_linux.go , and dig a bit, you'll see it actually uses copy_file_range That system call is cool because

The copy_file_range() system call performs an in-kernel copy between two file descriptors without the additional cost of transferring data from the kernel to user space and then back into the kernel.

*os.File knows how to ask the kernel to deliver data directly from the tcp socket to the file without your program even having to touch it.

See https://golang.org/pkg/io/#Copy.

Second, I make sure to use all the url components in the filename. URLs with different query strings go to different files. The fragment probably doesn't differentiate response bodies, so including that in the path may be ill considered. There's no awesome heuristic for turning URLs into valid file paths - if this were a serious task, I'd probably store the data in files based on a shasum of the url or something - and create an index of results stored in a metadata file.

Third, I handle all errors. req1, _ := http.NewRequest(... might seem like a convenient shortcut, but what it really means is that you won't know the real cause of any errors - at best. I usually add some descriptive text to the errors when percolating up, to make sure I can easily tell which error I'm returning.

Finally, I return successfully processed URLs so that I can see the final results. When scanning millions of URLS, you'd probably also want a list of which failed, but a count of successful is a good start at sending final data back for summary.

package main

import (
    "bufio"
    "bytes"
    "fmt"
    "io"
    "log"
    "net/http"
    "net/url"
    "os"
    "path/filepath"
    "time"
)

const urls_text = `http://danf.us/
https://farrellit.net/?3=2&#1
`

func crawler(urls <-chan *url.URL, done chan<- int) {
    var processed int = 0
    defer func() { done <- processed }()
    client := http.Client{Timeout: 10 * time.Second}
    for u := range urls {
        if req, err := http.NewRequest("GET", u.String(), nil); err != nil {
            log.Printf("Couldn't create new request for %s: %s", u.String(), err.Error())
        } else {
            req.Header.Add("User-agent", "Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/74.0") // changing user-agent
            if res, err := client.Do(req); err != nil {
                log.Printf("Failed to get %s: %s", u.String(), err.Error())
            } else {
                filename := filepath.Base(u.EscapedPath())
                if filename == "/" || filename == "" {
                    filename = "response"
                } else {
                    log.Printf("URL Filename is '%s'", filename)
                }
                destpath := filepath.Join(
                    res.Status, u.Scheme, u.Hostname(), u.EscapedPath(),
                    fmt.Sprintf("?%s",u.RawQuery), fmt.Sprintf("#%s",u.Fragment), filename,
                )
                if err := os.MkdirAll(filepath.Dir(destpath), 0755); err != nil {
                    log.Printf("Couldn't create directory %s: %s", filepath.Dir(destpath), err.Error())
                } else if f, err := os.OpenFile(destpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
                    log.Printf("Couldn't open destination file %s: %s", destpath, err.Error())
                } else {
                    if b, err := io.Copy(f, res.Body); err != nil {
                        log.Printf("Could not copy %s body to %s: %s", u.String(), destpath, err.Error())
                    } else {
                        log.Printf("Copied %d bytes from body of %s to %s", b, u.String(), destpath)
                        processed++
                    }
                    f.Close()
                }
                res.Body.Close()
            }
        }
    }
}

const workers = 3

func main() {
    urls := make(chan *url.URL)
    done := make(chan int)
    var submitted int = 0
    var inputted int = 0
    var successful int = 0
    for i := 0; i < workers; i++ {
        go crawler(urls, done)
    }
    sc := bufio.NewScanner(bytes.NewBufferString(urls_text))
    for sc.Scan() {
        inputted++
        if u, err := url.Parse(sc.Text()); err != nil {
            log.Printf("Could not parse %s as url: %w", sc.Text(), err)
        } else {
            submitted++
            urls <- u
        }
    }
    close(urls)
    for i := 0; i < workers; i++ {
        successful += <-done
    }
    log.Printf("%d urls input, %d could not be parsed. %d/%d valid URLs successful (%.0f%%)",
        inputted, inputted-submitted,
        successful, submitted,
        float64(successful)/float64(submitted)*100.0,
    )
}

Upvotes: 2

Emin Laletovic
Emin Laletovic

Reputation: 4324

When setting up a concurrent pipeline, a good guideline to follow is to always first set up and instantiate the listeners that will execute concurrently (in your case, crawlers), and then start feeding them data through the pipeline (in your case, the urlChannel).

In your example, the only thing preventing a deadlock is the fact that you've instantiated a buffered channel with the same number of rows that your test file has (1000 rows). What the code does is it puts URLs inside the urlChannel. Since there are 1000 rows inside your file, the urlChannel can take all of them without blocking. If you put more URLs inside the file, the execution will block after filling up the urlChannel.

Here is the version of the code that should work:

package main

import (
    "bufio"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "sync"
    "time"
)

func crawler(wg *sync.WaitGroup, urlChannel <-chan string) {
    defer wg.Done()
    client := &http.Client{Timeout: 10 * time.Second} // single client is sufficient for multiple requests

    for urlItem := range urlChannel {
        req1, _ := http.NewRequest("GET", "http://"+urlItem, nil)                                           // generating the request
        req1.Header.Add("User-agent", "Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/74.0") // changing user-agent
        resp1, respErr1 := client.Do(req1)                                                                  // sending the prepared request and getting the response
        if respErr1 != nil {
            continue
        }

        if resp1.StatusCode/100 == 2 { // means server responded with 2xx code
            text1, readErr1 := ioutil.ReadAll(resp1.Body) // try to read the sourcecode of the website
            if readErr1 != nil {
                log.Fatal(readErr1)
            }
            resp1.Body.Close()

            f1, fileErr1 := os.Create("200/" + urlItem + ".txt") // creating the relative file
            if fileErr1 != nil {
                log.Fatal(fileErr1)
            }

            _, writeErr1 := f1.Write(text1) // writing the sourcecode into our file
            if writeErr1 != nil {
                log.Fatal(writeErr1)
            }
            f1.Close() 
        }
    }
}

func main() {
    var wg sync.WaitGroup
    file, err := os.Open("urls.txt") // the file containing the url's
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close() // don't forget to close the file

    urlChannel := make(chan string) 

    _ = os.Mkdir("200", 0755) // if it's there, it will create an error, and we will simply ignore it

    // first, initialize crawlers
    wg.Add(10)
    for i := 0; i < 10; i++ {
        go crawler(&wg, urlChannel)
    }

    //after crawlers are initialized, start feeding them data through the channel
    scanner := bufio.NewScanner(file) // each line has another url
    for scanner.Scan() {
        urlChannel <- scanner.Text()
    }
    close(urlChannel)
    wg.Wait()
}

Upvotes: 1

Related Questions