Reputation: 2625
I am trying to load a big CSV file using goroutines using Golang. The dimension of the csv is (254882, 100). But using my goroutines when I am parsing the csv and storing it into an 2D list, I am getting rows lesser than 254882 and the number is varying for each run. I feel it is happening due goroutines but can't seem to point the reason. Can anyone please help me. I am also new in Golang. Here is my code below
func loadCSV(csvFile string) (*[][]float64, error) {
startTime := time.Now()
var dataset [][]float64
f, err := os.Open(csvFile)
if err != nil {
return &dataset, err
}
r := csv.NewReader(bufio.NewReader(f))
counter := 0
var wg sync.WaitGroup
for {
record, err := r.Read()
if err == io.EOF {
break
}
if counter != 0 {
wg.Add(1)
go func(r []string, dataset *[][]float64) {
var temp []float64
for _, each := range record {
f, err := strconv.ParseFloat(each, 64)
if err == nil {
temp = append(temp, f)
}
}
*dataset = append(*dataset, temp)
wg.Done()
}(record, &dataset)
}
counter++
}
wg.Wait()
duration := time.Now().Sub(startTime)
log.Printf("Loaded %d rows in %v seconds", counter, duration)
return &dataset, nil
}
And my main function looks like the following
func main() {
// runtime.GOMAXPROCS(4)
dataset, err := loadCSV("AvgW2V_train.csv")
if err != nil {
panic(err)
}
fmt.Println(len(*dataset))
}
If anyone needs to download the CSV too, then click the link below (485 MB) https://drive.google.com/file/d/1G4Nw6JyeC-i0R1exWp5BtRtGM1Fwyelm/view?usp=sharing
Upvotes: 3
Views: 586
Reputation: 166626
Your results are undefined because you have data races.
~/gopath/src$ go run -race racer.go
==================
WARNING: DATA RACE
Write at 0x00c00008a060 by goroutine 6:
runtime.mapassign_faststr()
/home/peter/go/src/runtime/map_faststr.go:202 +0x0
main.main.func2()
/home/peter/gopath/src/racer.go:16 +0x6a
Previous write at 0x00c00008a060 by goroutine 5:
runtime.mapassign_faststr()
/home/peter/go/src/runtime/map_faststr.go:202 +0x0
main.main.func1()
/home/peter/gopath/src/racer.go:11 +0x6a
Goroutine 6 (running) created at:
main.main()
/home/peter/gopath/src/racer.go:14 +0x88
Goroutine 5 (running) created at:
main.main()
/home/peter/gopath/src/racer.go:9 +0x5b
==================
fatal error: concurrent map writes
==================
WARNING: DATA RACE
Write at 0x00c00009a088 by goroutine 6:
main.main.func2()
/home/peter/gopath/src/racer.go:16 +0x7f
Previous write at 0x00c00009a088 by goroutine 5:
main.main.func1()
/home/peter/gopath/src/racer.go:11 +0x7f
Goroutine 6 (running) created at:
main.main()
/home/peter/gopath/src/racer.go:14 +0x88
Goroutine 5 (running) created at:
main.main()
/home/peter/gopath/src/racer.go:9 +0x5b
==================
goroutine 34 [running]:
runtime.throw(0x49e156, 0x15)
/home/peter/go/src/runtime/panic.go:608 +0x72 fp=0xc000094718 sp=0xc0000946e8 pc=0x44b342
runtime.mapassign_faststr(0x48ace0, 0xc00008a060, 0x49c9c3, 0x8, 0xc00009a088)
/home/peter/go/src/runtime/map_faststr.go:211 +0x46c fp=0xc000094790 sp=0xc000094718 pc=0x43598c
main.main.func1(0x49c9c3, 0x8)
/home/peter/gopath/src/racer.go:11 +0x6b fp=0xc0000947d0 sp=0xc000094790 pc=0x47ac6b
runtime.goexit()
/home/peter/go/src/runtime/asm_amd64.s:1340 +0x1 fp=0xc0000947d8 sp=0xc0000947d0 pc=0x473061
created by main.main
/home/peter/gopath/src/racer.go:9 +0x5c
goroutine 1 [sleep]:
time.Sleep(0x5f5e100)
/home/peter/go/src/runtime/time.go:105 +0x14a
main.main()
/home/peter/gopath/src/racer.go:19 +0x96
goroutine 35 [runnable]:
main.main.func2(0x49c9c3, 0x8)
/home/peter/gopath/src/racer.go:16 +0x6b
created by main.main
/home/peter/gopath/src/racer.go:14 +0x89
exit status 2
~/gopath/src$
racer.go
:
package main
import (
"bufio"
"encoding/csv"
"fmt"
"io"
"log"
"os"
"strconv"
"sync"
"time"
)
func loadCSV(csvFile string) (*[][]float64, error) {
startTime := time.Now()
var dataset [][]float64
f, err := os.Open(csvFile)
if err != nil {
return &dataset, err
}
r := csv.NewReader(bufio.NewReader(f))
counter := 0
var wg sync.WaitGroup
for {
record, err := r.Read()
if err == io.EOF {
break
}
if counter != 0 {
wg.Add(1)
go func(r []string, dataset *[][]float64) {
var temp []float64
for _, each := range record {
f, err := strconv.ParseFloat(each, 64)
if err == nil {
temp = append(temp, f)
}
}
*dataset = append(*dataset, temp)
wg.Done()
}(record, &dataset)
}
counter++
}
wg.Wait()
duration := time.Now().Sub(startTime)
log.Printf("Loaded %d rows in %v seconds", counter, duration)
return &dataset, nil
}
func main() {
// runtime.GOMAXPROCS(4)
dataset, err := loadCSV("/home/peter/AvgW2V_train.csv")
if err != nil {
panic(err)
}
fmt.Println(len(*dataset))
}
Upvotes: 6
Reputation: 1571
There is no need to use *[][]float64
as that would be a double pointer.
I have made some minor modifications to your program.
dataset
is available to new goroutine, since it's declared in it's above block of code.
Similarly record
is also available, but since record
variable, is changing from time to time, we need to pass it to new goroutine.
While there is no need to pass dataset
, as it is not changing and that is what we want, so that we can append temp to dataset
.
But race condition happens when multiple goroutines are trying to append to same variable, i.e., multiple goroutines are trying to write to same variable.
So we need to make sure that only one can goroutine can add at any instance of time. So we use a lock to make appending sequential.
package main
import (
"bufio"
"encoding/csv"
"fmt"
"os"
"strconv"
"sync"
)
func loadCSV(csvFile string) [][]float64 {
var dataset [][]float64
f, _ := os.Open(csvFile)
r := csv.NewReader(f)
var wg sync.WaitGroup
l := new(sync.Mutex) // lock
for record, err := r.Read(); err == nil; record, err = r.Read() {
wg.Add(1)
go func(record []string) {
defer wg.Done()
var temp []float64
for _, each := range record {
if f, err := strconv.ParseFloat(each, 64); err == nil {
temp = append(temp, f)
}
}
l.Lock() // lock before writing
dataset = append(dataset, temp) // write
l.Unlock() // unlock
}(record)
}
wg.Wait()
return dataset
}
func main() {
dataset := loadCSV("train.csv")
fmt.Println(len(dataset))
}
Some errors were not handled to make it minimal, but you should handle errors.
Upvotes: 3