Debashish
Debashish

Reputation: 1185

timer reset in separate go routine

In the following scenario a network entity always waits for a TimeOutTime seconds before doing a particular task X. Assume this time as TimerT. During this wait of TimeOutTime seconds if the entity receives a set of external messages, it should reset the same TimerT to TimeOutTime again. If no external messages are received the expected behaviour is as follows:

  1. Timer Expired
  2. Do task X
  3. Reset the Timer again to TimeOutTime

(by reset I mean, stop the timer and start over again)

enter image description here

To simulate the scenario I wrote the following code in Go.

package main

import (
    "log"
    "math/rand"
    "sync"
    "time"
)

const TimeOutTime = 3
const MeanArrivalTime = 4

func main() {
    rand.Seed(time.Now().UTC().UnixNano())
    var wg sync.WaitGroup
    t := time.NewTimer(time.Second * time.Duration(TimeOutTime))
    wg.Add(1)
    // go routine for doing timeout event
    go func() {
        defer wg.Done()
        for {
            t1 := time.Now()
            <-t.C
            t2 := time.Now()
            // Do.. task X .. on timeout...
            log.Println("Timeout after ", t2.Sub(t1))
            t.Reset(time.Second * time.Duration(TimeOutTime))
        }
    }()

    // go routine to simulate incoming messages ...
    // second go routine
    go func() {
        for {
            // simulates a incoming message at any time
            time.Sleep(time.Second * time.Duration(rand.Intn(MeanArrivalTime)))

            // once any message is received reset the timer to TimeOutTime seconds again
            t.Reset(time.Second * time.Duration(TimeOutTime))
        }
    }()

    wg.Wait()
}

After running this program using -race flag it shows DATA_RACE:

==================
WARNING: DATA RACE
Write at 0x00c0000c2068 by goroutine 8:
  time.(*Timer).Reset()
      /usr/local/go/src/time/sleep.go:125 +0x98
  main.main.func1()
      /home/deka/Academic/go/src/main/test.go:29 +0x18f

Previous write at 0x00c0000c2068 by goroutine 9:
  time.(*Timer).Reset()
      /usr/local/go/src/time/sleep.go:125 +0x98
  main.main.func2()
      /home/deka/Academic/go/src/main/test.go:42 +0x80

Goroutine 8 (running) created at:
  main.main()
      /home/deka/Academic/go/src/main/test.go:20 +0x1d3

Goroutine 9 (running) created at:
  main.main()
      /home/deka/Academic/go/src/main/test.go:35 +0x1f5
==================

Then I used a Mutex to wrap the Reset() call inside the Mutex.

package main

import (
    "log"
    "math/rand"
    "sync"
    "time"
)

const TimeOutTime = 3
const MeanArrivalTime = 4

func main() {
    rand.Seed(time.Now().UTC().UnixNano())
    var wg sync.WaitGroup
    t := time.NewTimer(time.Second * time.Duration(TimeOutTime))
    wg.Add(1)
    var mu sync.Mutex
    // go routine for doing timeout event
    go func() {
        defer wg.Done()
        for {
            t1 := time.Now()
            <-t.C
            t2 := time.Now()
            // Do.. task X .. on timeout...
            log.Println("Timeout after ", t2.Sub(t1))
            mu.Lock()
            t.Reset(time.Second * time.Duration(TimeOutTime))
            mu.Unlock()
        }
    }()

    // go routine to simulate incoming messages ...
    // second go routine
    go func() {
        for {
            // simulates a incoming message at any time
            time.Sleep(time.Second * time.Duration(rand.Intn(MeanArrivalTime)))

            // once any message is received reset the timer to TimeOutTime seconds again
            mu.Lock()
            t.Reset(time.Second * time.Duration(TimeOutTime))
            mu.Unlock()
        }
    }()

    wg.Wait()
}

After this code seems to work fine based on the following observation.

If I replace the line

time.Sleep(time.Second * time.Duration(rand.Intn(MeanArrivalTime)))

in the second go routine with a constant time of sleep of 4 seconds and the TimeOutTime is constant at 3 seconds.

Output of the program is:

2020/02/29 20:10:11 Timeout after  3.000160828s
2020/02/29 20:10:15 Timeout after  4.000444017s
2020/02/29 20:10:19 Timeout after  4.000454657s
2020/02/29 20:10:23 Timeout after  4.000304877s

In the above execution, 2nd go routine is resetting the active timer after the timer has spent initial one second. Because of which, the timer is getting expired after 4 seconds from the second print onward.

Now when I checked the documentation of Reset() I found the following:

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.



// Reset changes the timer to expire after duration d.
// It returns true if the timer had been active, false if the timer had
// expired or been stopped.
//
// Reset should be invoked only on stopped or expired timers with drained channels.
// If a program has already received a value from t.C, the timer is known
// to have expired and the channel drained, so t.Reset can be used directly.
// If a program has not yet received a value from t.C, however,
// the timer must be stopped and—if Stop reports that the timer expired
// before being stopped—the channel explicitly drained:
//
//  if !t.Stop() {
//      <-t.C
//  }
//  t.Reset(d)
//
// This should not be done concurrent to other receives from the Timer's
// channel.
//
// Note that it is not possible to use Reset's return value correctly, as there
// is a race condition between draining the channel and the new timer expiring.
// Reset should always be invoked on stopped or expired channels, as described above.
// The return value exists to preserve compatibility with existing programs.

I found this diagram: (link : https://blogtitle.github.io/go-advanced-concurrency-patterns-part-2-timers/)

Time Diagram GoLang

With the digram in mind, it seems that I need to use,

if !t.Stop() {
    <-t.C
}
t.Reset(d)

in the 2nd go routine. In this case I also need to do proper locking in both the go routine to avoid infinite wait on channel.

I don't understand the scenario under which the t.Stop() + draining of the channel (<-t.C) should be performed. In which case it is required ? In my example I don't use channel read values. Can I call Reset() without calling Stop() ?

Upvotes: 1

Views: 5778

Answers (3)

torek
torek

Reputation: 487705

You might consider a different overall design.

Suppose for instance that we write a routine or interface called Deadliner—it could become its own package if you like, or just be an interface, and we'll see a pretty strong resemblance to something Go already has—whose job / contract is described this way:

  • The user of a Deadliner creates a Deadline whenever they like.
  • The Deadliner waits until the deadline occurs, then flags the deadline as having occurred.
  • A Deadliner can be canceled by any Go routine any time. This flags the deadliner as canceled, so that anyone waiting on it will stop waiting, and can tell that the reason they stopped waiting was "canceled" (not "expired"). It helps clean up resources for gc as well, in case you create a lot of Deadliners and then discard them before their timeout fires.

Now in your top level, before you start waiting for a message, you simply set up a deadline. This isn't a timer (even if it may use one internally), it's just a Deadliner instance. Then you wait for one of two events:

d, cancel = newDeadline(when)
for {
    select {
    case <-d.Done():
          // Deadline expired.
          // ... handle it ...
          d, cancel = newDeadline(when) // if/as appropriate
    case m := <-msgC:
          // got message - cancel existing deadline and get new one
          cancel()
          d, cancel = newDeadline(when)
          // ... handle the message
    }
}

Now we just note that Go already has this: it's in package context. d is a context; newDeadline is context.WithDeadline or context.WithTimeout (depending on whether you want to compute the deadline time yourself, or have the timeout code add a duration to "now").

There is no need to fiddle with timers and time-tick channels and no need to spin off your own separate goroutines.

If the deadline doesn't reset on a single message, but rather on a particular combination of messages, you just write that in your case <-msgChan section. If messages aren't currently received via channels, make that happen by putting messages into a channel, so that you can use this very simple wait-for-deadline-or-message pattern.

Upvotes: 1

Oleksii Miroshnyk
Oleksii Miroshnyk

Reputation: 470

I simplified the code using time.After function:

package main

import (
    "log"
    "math/rand"
    "time"
)

const TimeOutTime = 3
const MeanArrivalTime = 4

func main() {
    const interval = time.Second * TimeOutTime
    // channel for incoming messages
    var incomeCh = make(chan struct{})

    go func() {
        for {
            // On each iteration new timer is created
            select {
            case <-time.After(interval):
                time.Sleep(time.Second)
                log.Println("Do task")
            case <-incomeCh:
                log.Println("Handle income message and move to the next iteration")
            }
        }
    }()

    go func() {
        for {
            time.Sleep(time.Duration(rand.Intn(MeanArrivalTime)) * time.Second)
            // generate incoming message
            incomeCh <- struct{}{}
        }
    }()

    // prevent main to stop for a while
    <-time.After(10 * time.Second)
}

Note that:

After waits for the duration to elapse and then sends the current time on the returned channel. It is equivalent to NewTimer(d).C. The underlying Timer is not recovered by the garbage collector until the timer fires. If efficiency is a concern, use NewTimer instead and call Timer.Stop if the timer is no longer needed.

Upvotes: 3

Burak Serdar
Burak Serdar

Reputation: 51467

Assume you have:

t.Stop()
t.Reset()

If the timer is stopped and drained before calling Stop, this works fine. The problem manifests itself if Stop stops the timer and timer ticks at the same time. Then you may end up with a stopped timer with a goroutine waiting to write to the t.C channel. So Stop returns false if there is still a goroutine waiting to write to t.C, and you have to read from it. Otherwise, you'll have that goroutine waiting there indefinitely.

So, as you already observed, you have to do:

if !t.Stop() {
    <-t.C
}
t.Reset(d)

However, even with that, I think your solution is flawed because of the use of asynchronous resets. Instead, try using a new timer for each simulated event.

Upvotes: 1

Related Questions