Reputation: 12740
Code below works fine with hard coded JSON data however doesn't work when I read JSON data from a file. I'm getting fatal error: all goroutines are asleep - deadlock
error when using sync.WaitGroup
.
WORKING EXAMPLE WITH HARD-CODED JSON DATA:
package main
import (
"bytes"
"fmt"
"os/exec"
"time"
)
func connect(host string) {
cmd := exec.Command("ssh", host, "uptime")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
fmt.Println(err)
}
fmt.Printf("%s: %q\n", host, out.String())
time.Sleep(time.Second * 2)
fmt.Printf("%s: DONE\n", host)
}
func listener(c chan string) {
for {
host := <-c
go connect(host)
}
}
func main() {
hosts := [2]string{"[email protected]", "[email protected]"}
var c chan string = make(chan string)
go listener(c)
for i := 0; i < len(hosts); i++ {
c <- hosts[i]
}
var input string
fmt.Scanln(&input)
}
OUTPUT:
user@user-VirtualBox:~/go$ go run channel.go
[email protected]: " 09:46:40 up 86 days, 18:16, 0 users, load average: 5"
[email protected]: " 09:46:40 up 86 days, 17:27, 1 user, load average: 9"
[email protected]: DONE
[email protected]: DONE
NOT WORKING - EXAMPLE WITH READING JSON DATA FILE:
package main
import (
"bytes"
"fmt"
"os/exec"
"time"
"encoding/json"
"os"
"sync"
)
func connect(host string) {
cmd := exec.Command("ssh", host, "uptime")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
fmt.Println(err)
}
fmt.Printf("%s: %q\n", host, out.String())
time.Sleep(time.Second * 2)
fmt.Printf("%s: DONE\n", host)
}
func listener(c chan string) {
for {
host := <-c
go connect(host)
}
}
type Content struct {
Username string `json:"username"`
Ip string `json:"ip"`
}
func main() {
var wg sync.WaitGroup
var source []Content
var hosts []string
data := json.NewDecoder(os.Stdin)
data.Decode(&source)
for _, value := range source {
hosts = append(hosts, value.Username + "@" + value.Ip)
}
var c chan string = make(chan string)
go listener(c)
for i := 0; i < len(hosts); i++ {
wg.Add(1)
c <- hosts[i]
defer wg.Done()
}
var input string
fmt.Scanln(&input)
wg.Wait()
}
OUTPUT
user@user-VirtualBox:~/go$ go run deploy.go < hosts.txt
[email protected]: " 09:46:40 up 86 days, 18:16, 0 users, load average: 5"
[email protected]: " 09:46:40 up 86 days, 17:27, 1 user, load average: 9"
[email protected] : DONE
[email protected]: DONE
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc210000068)
/usr/lib/go/src/pkg/runtime/sema.goc:199 +0x30
sync.(*WaitGroup).Wait(0xc210047020)
/usr/lib/go/src/pkg/sync/waitgroup.go:127 +0x14b
main.main()
/home/user/go/deploy.go:64 +0x45a
goroutine 3 [chan receive]:
main.listener(0xc210038060)
/home/user/go/deploy.go:28 +0x30
created by main.main
/home/user/go/deploy.go:53 +0x30b
exit status 2
user@user-VirtualBox:~/go$
HOSTS.TXT
[
{
"username":"user1",
"ip":"111.79.154.111"
},
{
"username":"user2",
"ip":"111.79.190.222"
}
]
Upvotes: 42
Views: 87033
Reputation: 1318
Arrived here from a search engine. Adding here for someone landing the same way. I, too, was getting fatal error: all goroutines are asleep - deadlock!
. In my case, I declared channels globally and did not initialize. Go did not complain at compile time and all goroutines went to sleep writing to/waiting for non-existent channels.
make(chan <type>)
solved my problem.
Upvotes: 0
Reputation: 47
package main
/*
Simulation for sending messages from threads for processing,
and getting a response (processing result) to the thread
*/
import (
"fmt"
"math/rand"
"time"
)
type (
TChans []chan TMsgRec
TMsgRec struct {
name string //channel name
rid int //-1 or index of response channel in TChans
msg string // message
note string // comment
}
TThRec struct { // for thread
name string
rid int // index of response channel in TChans (or -1)
job chan TMsgRec // chanel for send message to Receiver
resp chan TMsgRec // response channel back to thread
}
)
func main() {
index := -1
Chans := make(TChans, 100)
index = NewChanIndex(&Chans)
Job := Chans[index] // channel for send message from threads to Receiver
index = NewChanIndex(&Chans) // channel index for response, for the thread "1th"
go ping(TThRec{name: "1th", job: Job, rid: index, resp: Chans[index]})
index = NewChanIndex(&Chans) // channel index for response, for the thread "2th"
go ping(TThRec{name: "2th", job: Job, rid: index, resp: Chans[index]})
Receiver(Job, Chans)
}
func Receiver(c chan TMsgRec, pChans TChans) {
var v TMsgRec
for {
select {
case v = <-c: // receive message
{
if v.rid > -1 {
//pChans[v.rid] <- TMsgRec{name: v.name, rid: -1, msg: fmt.Sprint(v.msg, ":receiver "), note: ""}
go worker(v, pChans[v.rid])
}
}
default:
{
//fmt.Println("receiver")
SleepM(2)
}
}
}
}
func worker(v TMsgRec, r chan TMsgRec) {
// simulation SQL query, or auther process
SleepM(rand.Intn(50))
v.msg = v.msg + ":worker"
r <- v
}
func waitResponse(d chan TMsgRec, pTimeout int) (bool, TMsgRec) {
var v TMsgRec
for {
select {
case v = <-d:
{
return true, v
}
case <-time.After(10 * time.Second):
{
return false, v
}
}
}
}
func ping(pParam TThRec) {
SleepM(10)
var v TMsgRec
ok := true
i := 0
for i < 500 {
if ok {
ok = false
pParam.job <- TMsgRec{name: pParam.name, rid: pParam.rid, msg: fmt.Sprint(i), note: ""}
i++
}
if pParam.rid > -1 {
if !ok {
ok, v = waitResponse(pParam.resp, 10)
if ok {
fmt.Println(v.name, v.msg)
SleepM(1)
} else {
fmt.Println(pParam.name, "response timeout")
}
}
} else {
SleepM(1)
}
}
fmt.Println(v.name, "-- end --")
}
func NewChanIndex(pC *TChans) int {
for i, v := range *pC {
if v == nil {
(*pC)[i] = make(chan TMsgRec)
return i
}
}
return -1
}
func FreeRespChan(pC *TChans, pIndex int) {
if (*pC)[pIndex] != nil {
close((*pC)[pIndex]) //close channel
(*pC)[pIndex] = nil
}
}
func SleepM(pMilliSec int) { // sleep millisecounds
time.Sleep(time.Duration(pMilliSec) * time.Millisecond)
}
Upvotes: 1
Reputation: 1
try this code snippest
package main
import (
"bytes"
"fmt"
"os/exec"
"time"
"sync"
)
func connect(host string, wg *sync.WaitGroup) {
defer wg.Done()
cmd := exec.Command("ssh", host, "uptime")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
fmt.Println(err)
}
fmt.Printf("%s: %q\n", host, out.String())
time.Sleep(time.Second * 2)
fmt.Printf("%s: DONE\n", host)
}
func listener(c chan string,wg *sync.WaitGroup) {
for {
host,ok := <-c
// check channel is closed or not
if !ok{
break
}
go connect(host)
}
}
func main() {
var wg sync.WaitGroup
hosts := [2]string{"[email protected]", "[email protected]"}
var c chan string = make(chan string)
go listener(c)
for i := 0; i < len(hosts); i++ {
wg.Add(1)
c <- hosts[i]
}
close(c)
var input string
fmt.Scanln(&input)
wg.Wait()
}
Upvotes: -2
Reputation: 51
Thanks for the very nice and detailed explanation Grzegorz Żur.
One thing that I want to point it out that typically the func that needs to be threaded wont be in main()
, so we would have something like this:
package main
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"reflect"
"regexp"
"strings"
"sync"
"time"
)
var wg sync.WaitGroup // VERY IMP to declare this globally, other wise one //would hit "fatal error: all goroutines are asleep - deadlock!"
func doSomething(arg1 arg1Type) {
// cured cancer
}
func main() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randTime := r.Intn(10)
wg.Add(1)
go doSomething(randTime)
wg.Wait()
fmt.Println("Waiting for all threads to finish")
}
The thing that I want to point it out is that global declaration of wg
is very crucial for all threads to finish before main()
Upvotes: 5
Reputation: 49241
Go program ends when the main function ends.
From the language specification
Program execution begins by initializing the main package and then invoking the function main. When that function invocation returns, the program exits. It does not wait for other (non-main) goroutines to complete.
Therefore, you need to wait for your goroutines to finish. The common solution for this is to use sync.WaitGroup object.
The simplest possible code to synchronize goroutine:
package main
import "fmt"
import "sync"
var wg sync.WaitGroup // 1
func routine() {
defer wg.Done() // 3
fmt.Println("routine finished")
}
func main() {
wg.Add(1) // 2
go routine() // *
wg.Wait() // 4
fmt.Println("main finished")
}
And for synchronizing multiple goroutines
package main
import "fmt"
import "sync"
var wg sync.WaitGroup // 1
func routine(i int) {
defer wg.Done() // 3
fmt.Printf("routine %v finished\n", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 2
go routine(i) // *
}
wg.Wait() // 4
fmt.Println("main finished")
}
WaitGroup usage in order of execution.
* The actual parameters are evaluated before starting new gouroutine. Thus it is needed to evaluate them explicitly before wg.Add(1)
so the possibly panicking code would not leave increased counter.
Use
param := f(x)
wg.Add(1)
go g(param)
instead of
wg.Add(1)
go g(f(x))
Upvotes: 70