Shamelezz
Shamelezz

Reputation: 29

Limit goroutines in a loop

I need someome to help or at least any tip. I'm trying to read from large files (100mb - 11gb) line by line and then store some data into Map.

var m map[string]string

// expansive func
func stress(s string, mutex sync.Mutex)  {
    // some very cost operation .... that's why I want to use goroutines
    mutex.Lock()
    m[s] = s // store result
    mutex.Unlock()
}

func main() {
    file, err := os.Open("somefile.txt")
    if err != nil {
        fmt.Println(err)
        return
    }
    defer func() {
        if err = file.Close(); err != nil {
            fmt.Println(err)
            return
        }
    }()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        go stress(scanner.Text(), mutex)
    }
}

Without gouroutines it works fine but slow. As you can see, file is large so within loop there will be a lot of gouroutines. And that fact provides two problems:

  1. Sometimes mutex doesn't work properly. And programm crashes. (How many goroutines mutex suppose?)
  2. Everytime some data just lost (But programm doesn't crash)

I suppose I should use WaitGroup, but I cannot understand how it should be. Also I guess there should be some limit for goroutines, maybe some counter. It would be great to run it in 5-20 goroutines.


UPD. Yes, As @user229044 mentioned, I have to pass mutex by pointer. But the problem with limiting goroutines within loop still active.


UPD2. This is how I workaround this problem. I don't exactly understand which way program handle these goroutines and how memory and process time go. Also almost all commentors point on Map structure, but the main problem was to handle runtime of goroutines. How many goroutines spawn if it would be 10billions iterations of Scan() loop, and how goroutines store in RAM?

func stress(s string, mutex *sync.Mutex)  {
    // a lot of coslty ops
    // ...
    // ...
    mutex.Lock()
    m[where] = result // store result
    mutex.Unlock()
    wg.Done()
}

// main
for scanner.Scan() {
    wg.Add(1)
    go func(data string) {
        stress(data, &mutex)
    }(scanner.Text())
}
wg.Wait()

Upvotes: 0

Views: 1016

Answers (1)

user229044
user229044

Reputation: 239250

Your specific problem is that you're copying the mutex by value. You should be passing a pointer to the mutex, so that a single instance of your mutex is shared by all function invocations. You're also spawning an unbounded number of go routines, which will eventually exhaust your system's memory.

However, you can spawn as many Go routines as you want and you're only wasting resources for no gain, and juggling all of those useless Go routines will probably cause a net loss of performance. Increased parallelism can't help you when every parallel process has to wait for serialized access to a data structure, as is the case with your map. sync.WaitGroup and mutexes are the wrong approach here.

Instead, to add and control concurrency, you want a buffered channel and single Go routine responsible for map inserts. This way you have one process reading from the file, and one process inserting into the map, decoupling the disk IO from the map insertion.

Something like this:

scanner := bufio.NewScanner(file)

ch := make(chan string, 10)

go func() {
    for s := range ch {
        m[s] = s
    }
}()

for scanner.Scan() {
    ch <- scanner.Text()
}

close(ch)

Upvotes: 1

Related Questions