cLee
cLee

Reputation: 53

concurrency and timeout in Go

This Go program:

package main

import (
    "fmt"
    "time"
)

func main() {
    start := time.Now()

    sleep_durations := []int{8100, 1000, 2500, 500, 6000}
    // sleep_durations := []int{8100, 1000, 2500, 500}
    c := make(chan string)
    defer close(c) // close channel when main exits
    for index, duration := range sleep_durations {
        go sleepy(fmt.Sprintf("sleepy%d: ", index+1), duration, c)
    }
    fmt.Printf("starting %d sleepys\n", len(sleep_durations))

    for range sleep_durations {
        select {
        case msg := <-c:
            fmt.Println("received: ", msg)
        case <-time.After(time.Second * time.Duration(5)):
            fmt.Println("*** TIMEOUT ***")
        }
    }

    elapsed := time.Since(start)
    fmt.Printf("... %d sleepys ran in: %e\n", len(sleep_durations), elapsed.Seconds())
}

func sleepy(msg string, sleep_ms int, yawn chan string) {
    start := time.Now()
    sleep := time.Duration(sleep_ms) * time.Millisecond
    time.Sleep(sleep) // some sleepy work
    yawn <- fmt.Sprintf("%s slept for %s", msg, sleep)
    elapsed := time.Since(start)
    fmt.Printf("\t%s finished in: %s\n", msg, elapsed)
}

https://play.golang.org/p/0ioTuKv230

has confusing results. When line 11 is uncommented it does not work as expected, i.e. the time.After 5 seconds doesn't happen. But with line 11 commented and line 12 uncommented the timeout does work as expected. I'm new to Go, but what am I missing?

Upvotes: 1

Views: 636

Answers (2)

cLee
cLee

Reputation: 53

I found what I was looking for:

package main

import (
  "fmt"
  "sync"
  "time"
)

func main() {
  var wg sync.WaitGroup
  done := make(chan struct{})
  wq := make(chan interface{})
  worker_count := 2

  for i := 0; i < worker_count; i++ {
    wg.Add(1)
    go doit(i, wq, done, &wg)
  }

  fmt.Printf("doing work\n")
  for i := 0; i < worker_count; i++ {
    time.Sleep(time.Millisecond * time.Duration(100))
    wq <- fmt.Sprintf("worker: %d", i)
  }

  fmt.Printf("closing 'done' channel\n")
  close(done)
  fmt.Printf("block/wait until all workers are done\n")
  wg.Wait()
  fmt.Println("all done!")
}

func doit(worker_id int, wq <-chan interface{}, done <-chan struct{}, wg *sync.WaitGroup) {
  fmt.Printf("[%v] is working\n", worker_id)
  defer wg.Done()
  max_time := time.Second * time.Duration(5)
  for {
    select {
    case m := <-wq:
      fmt.Printf("[%v] m => %v\n", worker_id, m)
    case <-done:
      fmt.Printf("[%v] is done\n", worker_id)
      return
    case <-time.After(max_time):
      fmt.Printf("timeout > %s seconds!\n", max_time)
    }
  }
}

Upvotes: -1

Dave C
Dave C

Reputation: 7878

The timeout is per-select invocation, if it keeps getting something from the channel once every 5 seconds (or less) it will keep going indefinitely.

If you meant for the timeout to apply to all operations then do so like so:

    t := time.After(5 * time.Second)
    for range sleep_durations {
        select {
        case msg := <-c:
            fmt.Println("received: ", msg)
        case <-t:
            fmt.Println("*** TIMEOUT ***")
        }
    }

https://play.golang.org/p/r0_PyeE3bx

Upvotes: 2

Related Questions