Reputation: 9682
I have this project that tries to run unlimited bigqueries at the same time in Go. The parent project is all Python. I need to be able to keep track of the query results, like in a map.
Input:
{
'reports_portal': 'select * from reports_portal',
'billing_portal': 'select * from billing_portal',
}
output:
{
'reports_portal': [23, 123, 5234, 632],
'billing_portal': [23, 123, 5234, 632],
}
and so on
these bigqueries need to be run asynchronously as they're very slow (from a UI perspective, an SRE waiting 15-30 seconds for results.
I first try to asynchronously write items to a map:
package main
import (
"fmt"
)
func add_to_map(m map[string] string, word string) map[string]string {
added_word := word + " plus more letters"
m[word] = added_word
return m
}
func main() {
words_map := make(map[string]string)
words := []string{"giraffe", "cat", "dog", "turtle"}
for _, this_word := range words {
go add_to_map(words_map, this_word)
}
fmt.Println(words_map)
}
blows up like:
$ go run try_asynchronous.go
fatal error: concurrent map writes
goroutine 7 [running]:
runtime.throw(0x10b3b96, 0x15)
/usr/local/Cellar/go/1.8.1/libexec/src/runtime/panic.go:596 +0x95 fp=0xc420032eb8 sp=0xc420032e98
runtime.mapassign(0x109ad20, 0xc420016270, 0xc420032fa0, 0x10b3268)
/usr/local/Cellar/go/1.8.1/libexec/src/runtime/hashmap.go:499 +0x667 fp=0xc420032f58 sp=0xc420032eb8
main.add_to_map(0xc420016270, 0x10b1ba0, 0x3, 0x0)
/tmp/golang-w-python/try_asynchronous.go:10 +0xa3 fp=0xc420032fc0 sp=0xc420032f58
runtime.goexit()
/usr/local/Cellar/go/1.8.1/libexec/src/runtime/asm_amd64.s:2197 +0x1 fp=0xc420032fc8 sp=0xc420032fc0
created by main.main
/tmp/golang-w-python/try_asynchronous.go:19 +0xc8
goroutine 1 [runnable]:
fmt.(*pp).fmtString(0xc42001e0c0, 0x10b1f52, 0x7, 0x76)
/usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:424 +0x1a2
fmt.(*pp).printValue(0xc42001e0c0, 0x10953c0, 0xc42000e260, 0x98, 0x76, 0x1)
/usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:729 +0x27aa
fmt.(*pp).printValue(0xc42001e0c0, 0x109ad20, 0xc420016270, 0x15, 0x76, 0x0)
/usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:750 +0x103d
fmt.(*pp).printArg(0xc42001e0c0, 0x109ad20, 0xc420016270, 0x76)
/usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:682 +0x217
fmt.(*pp).doPrintln(0xc42001e0c0, 0xc420045f28, 0x1, 0x1)
/usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:1138 +0xa1
fmt.Fprintln(0x1108140, 0xc42000c018, 0xc420045f28, 0x1, 0x1, 0xc420045ef0, 0xc420045ee0, 0x1087218)
/usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:247 +0x5c
fmt.Println(0xc420045f28, 0x1, 0x1, 0x10b1e6f, 0x6, 0x0)
/usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:257 +0x57
main.main()
/tmp/golang-w-python/try_asynchronous.go:21 +0x132
exit status 2
based on needing to run many queries at once and trying to keep track of the results by their name, I expected to write to a map during asynchronous. But fatal error: concurrent map writes
says you can't.
I don't understand
EDIT:
The closest thing I have, that returns results, is not asynchronous:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var mutex sync.Mutex
var wg sync.WaitGroup
func random_sleep() {
r := rand.Intn(3000)
time.Sleep(time.Duration(r) * time.Millisecond)
}
func add_to_map(m map[string] string, word string) {
defer wg.Done()
added_word := word + " plus more letters"
mutex.Lock()
defer mutex.Unlock()
fmt.Println("Before sleep")
random_sleep()
m[word] = added_word
fmt.Println("Added word %v", word)
}
func main() {
words_map := make(map[string]string)
words := []string{"giraffe", "cat", "dog", "turtle"}
for _, this_word := range words {
wg.Add(1)
go add_to_map(words_map, this_word)
}
wg.Wait()
fmt.Println(words_map)
}
Results are wrong:
cchilders:~/work_projects/metricsportal/golang_integration (feature/golang-query)
$ go run try_async.go
Before sleep
Added word %v turtle
Before sleep
Added word %v cat
Before sleep
Added word %v giraffe
Before sleep
Added word %v dog
map[dog:dog plus more letters turtle:turtle plus more letters cat:cat plus more letters giraffe:giraffe plus more letters]
cchilders:~/work_projects/metricsportal/golang_integration (feature/golang-query)
$ go run try_async.go
Before sleep
Added word %v turtle
Before sleep
Added word %v cat
Before sleep
Added word %v giraffe
Before sleep
Added word %v dog
map[dog:dog plus more letters turtle:turtle plus more letters cat:cat plus more letters giraffe:giraffe plus more letters]
Results should be very fast, no longer than 3 seconds (the max of random I think):
Expectation -
Before sleep
Before sleep
Before sleep
Before sleep
Added word %v cat
Added word %v giraffe
Added word %v turtle
Added word %v dog
Upvotes: 2
Views: 2246
Reputation: 556
OK let me clarify some things and help you.
You don't need to return a modified map from here because your function gets a reference to map not copy of it. (Let's ignore the fact that you are completely ignoring return value)
func add_to_map(m map[string] string, word string) map[string]string {
added_word := word + " plus more letters"
m[word] = added_word
return m
}
Next thing is that you need to synchronize access to map. You can use mutex for this.
import "sync"
var mutex sync.Mutex //glabal variable but can be created as local also
func add_to_map(m map[string] string, word string) {
added_word := word + " plus more letters"
// here you can do long to compute task and calculate result
// calc here
mutex.Lock() //result ready lock mutex
defer mutex.Unlock() // unlock mutex when we return from function
m[word] = added_word // result write to shared map
}
Note that in Go 1.9 there will be a Concurrent Map type.
Edit:
You need to wait for all go-routines to finish because your main()
now finishes before them. You can do this by using WaitGroup
package main
import (
"fmt"
"sync"
)
var mutex sync.Mutex
var wg sync.WaitGroup
func add_to_map(m map[string] string, word string) {
defer wg.Done()
added_word := word + " plus more letters"
// do heavy work here
//
mutex.Lock()
defer mutex.Unlock()
m[word] = added_word
}
func main() {
words_map := make(map[string]string)
words := []string{"giraffe", "cat", "dog", "turtle"}
for _, this_word := range words {
wg.Add(1)
go add_to_map(words_map, this_word)
}
wg.Wait()
fmt.Println(words_map)
}
Upvotes: 1
Reputation: 20469
(Posted solution on behalf of the OP).
My usage of fake delay was wrong, the solutions both work. Thank you:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var mutex sync.Mutex
var wg sync.WaitGroup
func random_sleep() {
r := rand.Intn(3000)
time.Sleep(time.Duration(r) * time.Millisecond)
}
func add_to_map(m map[string] string, word string) {
defer wg.Done()
added_word := word + " plus more letters"
fmt.Println("Before sleep")
random_sleep()
mutex.Lock()
defer mutex.Unlock()
m[word] = added_word
fmt.Println("Added word %v", word)
}
func main() {
words_map := make(map[string]string)
words := []string{"giraffe", "cat", "dog", "turtle"}
for _, this_word := range words {
wg.Add(1)
go add_to_map(words_map, this_word)
}
wg.Wait()
fmt.Println(words_map)
}
Result:
$ go run try_async.go
Before sleep
Before sleep
Before sleep
Before sleep
Added word %v dog
Added word %v giraffe
Added word %v cat
Added word %v turtle
map[turtle:turtle plus more letters dog:dog plus more letters giraffe:giraffe plus more letters cat:cat plus more letters]
Upvotes: 0
Reputation: 12393
You have two different issues in your code:
1) Even if you are always writing to different keys, you can't do that simultaneously without locking the map: https://golang.org/doc/faq#atomic_maps
So, you need to just make sure you get exclusive access to the map when accessing it.
2) You need to finish for all goroutines to finish before printing the map (that's why you get inconsistent results in your edited code)
A simple way to solve both issues based on your example:
package main
import (
"fmt"
"sync"
)
var mutex sync.Mutex
var wg sync.WaitGroup
func add_to_map(m map[string] string, word string) {
defer wg.Done()
added_word := word + " plus more letters"
mutex.Lock()
defer mutex.Unlock()
m[word] = added_word
}
func main() {
words_map := make(map[string]string)
words := []string{"giraffe", "cat", "dog", "turtle"}
for _, this_word := range words {
wg.Add(1)
go add_to_map(words_map, this_word)
}
wg.Wait()
fmt.Println(words_map)
}
Upvotes: 1