Reputation: 29
I need someome to help or at least any tip. I'm trying to read from large files (100mb - 11gb) line by line and then store some data into Map.
var m map[string]string
// expansive func
func stress(s string, mutex sync.Mutex) {
// some very cost operation .... that's why I want to use goroutines
mutex.Lock()
m[s] = s // store result
mutex.Unlock()
}
func main() {
file, err := os.Open("somefile.txt")
if err != nil {
fmt.Println(err)
return
}
defer func() {
if err = file.Close(); err != nil {
fmt.Println(err)
return
}
}()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
go stress(scanner.Text(), mutex)
}
}
Without gouroutines it works fine but slow. As you can see, file is large so within loop there will be a lot of gouroutines. And that fact provides two problems:
I suppose I should use WaitGroup, but I cannot understand how it should be. Also I guess there should be some limit for goroutines, maybe some counter. It would be great to run it in 5-20 goroutines.
UPD. Yes, As @user229044 mentioned, I have to pass mutex by pointer. But the problem with limiting goroutines within loop still active.
UPD2. This is how I workaround this problem. I don't exactly understand which way program handle these goroutines and how memory and process time go. Also almost all commentors point on Map structure, but the main problem was to handle runtime of goroutines. How many goroutines spawn if it would be 10billions iterations of Scan() loop, and how goroutines store in RAM?
func stress(s string, mutex *sync.Mutex) {
// a lot of coslty ops
// ...
// ...
mutex.Lock()
m[where] = result // store result
mutex.Unlock()
wg.Done()
}
// main
for scanner.Scan() {
wg.Add(1)
go func(data string) {
stress(data, &mutex)
}(scanner.Text())
}
wg.Wait()
Upvotes: 0
Views: 1016
Reputation: 239250
Your specific problem is that you're copying the mutex by value. You should be passing a pointer to the mutex, so that a single instance of your mutex is shared by all function invocations. You're also spawning an unbounded number of go routines, which will eventually exhaust your system's memory.
However, you can spawn as many Go routines as you want and you're only wasting resources for no gain, and juggling all of those useless Go routines will probably cause a net loss of performance. Increased parallelism can't help you when every parallel process has to wait for serialized access to a data structure, as is the case with your map
. sync.WaitGroup
and mutexes are the wrong approach here.
Instead, to add and control concurrency, you want a buffered channel and single Go routine responsible for map
inserts. This way you have one process reading from the file, and one process inserting into the map
, decoupling the disk IO from the map insertion.
Something like this:
scanner := bufio.NewScanner(file)
ch := make(chan string, 10)
go func() {
for s := range ch {
m[s] = s
}
}()
for scanner.Scan() {
ch <- scanner.Text()
}
close(ch)
Upvotes: 1