codyc4321
codyc4321

Reputation: 9682

Run Go asynchronous operations and write to map

I have this project that tries to run unlimited bigqueries at the same time in Go. The parent project is all Python. I need to be able to keep track of the query results, like in a map.

Input:

{
 'reports_portal': 'select * from reports_portal',
 'billing_portal': 'select * from billing_portal',
}

output:

{
 'reports_portal': [23, 123, 5234, 632],
 'billing_portal': [23, 123, 5234, 632],
}

and so on

these bigqueries need to be run asynchronously as they're very slow (from a UI perspective, an SRE waiting 15-30 seconds for results.

I first try to asynchronously write items to a map:

package main

import (
    "fmt"
)


func add_to_map(m map[string] string, word string) map[string]string {
    added_word := word + " plus more letters"
    m[word] = added_word
    return m
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        go add_to_map(words_map, this_word)
    }
    fmt.Println(words_map)
}

blows up like:

$ go run try_asynchronous.go 
fatal error: concurrent map writes

goroutine 7 [running]:
runtime.throw(0x10b3b96, 0x15)
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/panic.go:596 +0x95 fp=0xc420032eb8 sp=0xc420032e98
runtime.mapassign(0x109ad20, 0xc420016270, 0xc420032fa0, 0x10b3268)
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/hashmap.go:499 +0x667 fp=0xc420032f58 sp=0xc420032eb8
main.add_to_map(0xc420016270, 0x10b1ba0, 0x3, 0x0)
    /tmp/golang-w-python/try_asynchronous.go:10 +0xa3 fp=0xc420032fc0 sp=0xc420032f58
runtime.goexit()
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/asm_amd64.s:2197 +0x1 fp=0xc420032fc8 sp=0xc420032fc0
created by main.main
    /tmp/golang-w-python/try_asynchronous.go:19 +0xc8

goroutine 1 [runnable]:
fmt.(*pp).fmtString(0xc42001e0c0, 0x10b1f52, 0x7, 0x76)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:424 +0x1a2
fmt.(*pp).printValue(0xc42001e0c0, 0x10953c0, 0xc42000e260, 0x98, 0x76, 0x1)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:729 +0x27aa
fmt.(*pp).printValue(0xc42001e0c0, 0x109ad20, 0xc420016270, 0x15, 0x76, 0x0)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:750 +0x103d
fmt.(*pp).printArg(0xc42001e0c0, 0x109ad20, 0xc420016270, 0x76)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:682 +0x217
fmt.(*pp).doPrintln(0xc42001e0c0, 0xc420045f28, 0x1, 0x1)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:1138 +0xa1
fmt.Fprintln(0x1108140, 0xc42000c018, 0xc420045f28, 0x1, 0x1, 0xc420045ef0, 0xc420045ee0, 0x1087218)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:247 +0x5c
fmt.Println(0xc420045f28, 0x1, 0x1, 0x10b1e6f, 0x6, 0x0)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:257 +0x57
main.main()
    /tmp/golang-w-python/try_asynchronous.go:21 +0x132
exit status 2

based on needing to run many queries at once and trying to keep track of the results by their name, I expected to write to a map during asynchronous. But fatal error: concurrent map writes says you can't.

I don't understand

  1. why not
  2. what I should do the run these bigqueries simultaneously.

EDIT:

The closest thing I have, that returns results, is not asynchronous:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func random_sleep() {
    r := rand.Intn(3000)
    time.Sleep(time.Duration(r) * time.Millisecond)
}

func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    mutex.Lock()
    defer mutex.Unlock()
    fmt.Println("Before sleep")
    random_sleep()
    m[word] = added_word
    fmt.Println("Added word %v", word)
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

Results are wrong:

cchilders:~/work_projects/metricsportal/golang_integration (feature/golang-query) 
$ go run try_async.go 
Before sleep
Added word %v turtle
Before sleep
Added word %v cat
Before sleep
Added word %v giraffe
Before sleep
Added word %v dog
map[dog:dog plus more letters turtle:turtle plus more letters cat:cat plus more letters giraffe:giraffe plus more letters]

cchilders:~/work_projects/metricsportal/golang_integration (feature/golang-query) 
$ go run try_async.go 
Before sleep
Added word %v turtle
Before sleep
Added word %v cat
Before sleep
Added word %v giraffe
Before sleep
Added word %v dog
map[dog:dog plus more letters turtle:turtle plus more letters cat:cat plus more letters giraffe:giraffe plus more letters]

Results should be very fast, no longer than 3 seconds (the max of random I think):

Expectation - 

Before sleep
Before sleep
Before sleep
Before sleep
Added word %v cat
Added word %v giraffe
Added word %v turtle
Added word %v dog

Upvotes: 2

Views: 2246

Answers (3)

Radosław Załuska
Radosław Załuska

Reputation: 556

OK let me clarify some things and help you.

You don't need to return a modified map from here because your function gets a reference to map not copy of it. (Let's ignore the fact that you are completely ignoring return value)

func add_to_map(m map[string] string, word string) map[string]string {
    added_word := word + " plus more letters"
    m[word] = added_word
    return m
}

Next thing is that you need to synchronize access to map. You can use mutex for this.

import "sync"

var mutex sync.Mutex //glabal variable but can be created as local also

func add_to_map(m map[string] string, word string) {
    added_word := word + " plus more letters"
    // here you can do long to compute task and calculate result
    // calc here
    mutex.Lock() //result ready lock mutex
    defer mutex.Unlock() // unlock mutex when we return from function
    m[word] = added_word // result write to shared map
}

Note that in Go 1.9 there will be a Concurrent Map type.

Edit: You need to wait for all go-routines to finish because your main() now finishes before them. You can do this by using WaitGroup

package main

import (
    "fmt"
    "sync"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    // do heavy work here
    //
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

Upvotes: 1

halfer
halfer

Reputation: 20469

(Posted solution on behalf of the OP).

My usage of fake delay was wrong, the solutions both work. Thank you:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func random_sleep() {
    r := rand.Intn(3000)
    time.Sleep(time.Duration(r) * time.Millisecond)
}


func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    fmt.Println("Before sleep")
    random_sleep()
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
    fmt.Println("Added word %v", word)
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

Result:

$ go run try_async.go 
Before sleep
Before sleep
Before sleep
Before sleep
Added word %v dog
Added word %v giraffe
Added word %v cat
Added word %v turtle
map[turtle:turtle plus more letters dog:dog plus more letters giraffe:giraffe plus more letters cat:cat plus more letters]

Upvotes: 0

eugenioy
eugenioy

Reputation: 12393

You have two different issues in your code:

1) Even if you are always writing to different keys, you can't do that simultaneously without locking the map: https://golang.org/doc/faq#atomic_maps

So, you need to just make sure you get exclusive access to the map when accessing it.

2) You need to finish for all goroutines to finish before printing the map (that's why you get inconsistent results in your edited code)

A simple way to solve both issues based on your example:

package main

import (
    "fmt"
    "sync"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

Upvotes: 1

Related Questions