Reputation: 1185
In the following scenario a network entity always waits for a TimeOutTime
seconds before doing a particular task X
. Assume this time as TimerT
. During this wait of TimeOutTime
seconds if the entity receives a set of external messages, it should reset the same TimerT
to TimeOutTime
again. If no external messages are received the expected behaviour is as follows:
TimeOutTime
(by reset
I mean, stop the timer and start over again)
To simulate the scenario I wrote the following code in Go.
package main
import (
"log"
"math/rand"
"sync"
"time"
)
const TimeOutTime = 3
const MeanArrivalTime = 4
func main() {
rand.Seed(time.Now().UTC().UnixNano())
var wg sync.WaitGroup
t := time.NewTimer(time.Second * time.Duration(TimeOutTime))
wg.Add(1)
// go routine for doing timeout event
go func() {
defer wg.Done()
for {
t1 := time.Now()
<-t.C
t2 := time.Now()
// Do.. task X .. on timeout...
log.Println("Timeout after ", t2.Sub(t1))
t.Reset(time.Second * time.Duration(TimeOutTime))
}
}()
// go routine to simulate incoming messages ...
// second go routine
go func() {
for {
// simulates a incoming message at any time
time.Sleep(time.Second * time.Duration(rand.Intn(MeanArrivalTime)))
// once any message is received reset the timer to TimeOutTime seconds again
t.Reset(time.Second * time.Duration(TimeOutTime))
}
}()
wg.Wait()
}
After running this program using -race
flag it shows DATA_RACE
:
==================
WARNING: DATA RACE
Write at 0x00c0000c2068 by goroutine 8:
time.(*Timer).Reset()
/usr/local/go/src/time/sleep.go:125 +0x98
main.main.func1()
/home/deka/Academic/go/src/main/test.go:29 +0x18f
Previous write at 0x00c0000c2068 by goroutine 9:
time.(*Timer).Reset()
/usr/local/go/src/time/sleep.go:125 +0x98
main.main.func2()
/home/deka/Academic/go/src/main/test.go:42 +0x80
Goroutine 8 (running) created at:
main.main()
/home/deka/Academic/go/src/main/test.go:20 +0x1d3
Goroutine 9 (running) created at:
main.main()
/home/deka/Academic/go/src/main/test.go:35 +0x1f5
==================
Then I used a Mutex to wrap the Reset()
call inside the Mutex.
package main
import (
"log"
"math/rand"
"sync"
"time"
)
const TimeOutTime = 3
const MeanArrivalTime = 4
func main() {
rand.Seed(time.Now().UTC().UnixNano())
var wg sync.WaitGroup
t := time.NewTimer(time.Second * time.Duration(TimeOutTime))
wg.Add(1)
var mu sync.Mutex
// go routine for doing timeout event
go func() {
defer wg.Done()
for {
t1 := time.Now()
<-t.C
t2 := time.Now()
// Do.. task X .. on timeout...
log.Println("Timeout after ", t2.Sub(t1))
mu.Lock()
t.Reset(time.Second * time.Duration(TimeOutTime))
mu.Unlock()
}
}()
// go routine to simulate incoming messages ...
// second go routine
go func() {
for {
// simulates a incoming message at any time
time.Sleep(time.Second * time.Duration(rand.Intn(MeanArrivalTime)))
// once any message is received reset the timer to TimeOutTime seconds again
mu.Lock()
t.Reset(time.Second * time.Duration(TimeOutTime))
mu.Unlock()
}
}()
wg.Wait()
}
After this code seems to work fine based on the following observation.
If I replace the line
time.Sleep(time.Second * time.Duration(rand.Intn(MeanArrivalTime)))
in the second go routine with a constant time of sleep of 4 seconds
and the TimeOutTime
is constant at 3 seconds
.
Output of the program is:
2020/02/29 20:10:11 Timeout after 3.000160828s
2020/02/29 20:10:15 Timeout after 4.000444017s
2020/02/29 20:10:19 Timeout after 4.000454657s
2020/02/29 20:10:23 Timeout after 4.000304877s
In the above execution, 2nd
go routine is resetting the active timer
after the timer has spent initial one second. Because of which, the timer
is getting expired after 4
seconds from the second print onward.
Now when I checked the documentation of Reset()
I found the following:
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Reset changes the timer to expire after duration d.
// It returns true if the timer had been active, false if the timer had
// expired or been stopped.
//
// Reset should be invoked only on stopped or expired timers with drained channels.
// If a program has already received a value from t.C, the timer is known
// to have expired and the channel drained, so t.Reset can be used directly.
// If a program has not yet received a value from t.C, however,
// the timer must be stopped and—if Stop reports that the timer expired
// before being stopped—the channel explicitly drained:
//
// if !t.Stop() {
// <-t.C
// }
// t.Reset(d)
//
// This should not be done concurrent to other receives from the Timer's
// channel.
//
// Note that it is not possible to use Reset's return value correctly, as there
// is a race condition between draining the channel and the new timer expiring.
// Reset should always be invoked on stopped or expired channels, as described above.
// The return value exists to preserve compatibility with existing programs.
I found this diagram: (link : https://blogtitle.github.io/go-advanced-concurrency-patterns-part-2-timers/)
With the digram in mind, it seems that I need to use,
if !t.Stop() {
<-t.C
}
t.Reset(d)
in the 2nd
go routine. In this case I also need to do proper locking in both the go routine to avoid infinite wait on channel.
I don't understand the scenario under which the t.Stop() + draining of the channel (<-t.C)
should be performed. In which case it is required ? In my example I don't use channel read values. Can I call Reset() without calling Stop() ?
Upvotes: 1
Views: 5778
Reputation: 487705
You might consider a different overall design.
Suppose for instance that we write a routine or interface called Deadliner—it could become its own package if you like, or just be an interface, and we'll see a pretty strong resemblance to something Go already has—whose job / contract is described this way:
Now in your top level, before you start waiting for a message, you simply set up a deadline. This isn't a timer (even if it may use one internally), it's just a Deadliner instance. Then you wait for one of two events:
d, cancel = newDeadline(when)
for {
select {
case <-d.Done():
// Deadline expired.
// ... handle it ...
d, cancel = newDeadline(when) // if/as appropriate
case m := <-msgC:
// got message - cancel existing deadline and get new one
cancel()
d, cancel = newDeadline(when)
// ... handle the message
}
}
Now we just note that Go already has this: it's in package context
. d
is a context; newDeadline
is context.WithDeadline
or context.WithTimeout
(depending on whether you want to compute the deadline time yourself, or have the timeout code add a duration to "now").
There is no need to fiddle with timers and time-tick channels and no need to spin off your own separate goroutines.
If the deadline doesn't reset on a single message, but rather on a particular combination of messages, you just write that in your case <-msgChan
section. If messages aren't currently received via channels, make that happen by putting messages into a channel, so that you can use this very simple wait-for-deadline-or-message pattern.
Upvotes: 1
Reputation: 470
I simplified the code using time.After
function:
package main
import (
"log"
"math/rand"
"time"
)
const TimeOutTime = 3
const MeanArrivalTime = 4
func main() {
const interval = time.Second * TimeOutTime
// channel for incoming messages
var incomeCh = make(chan struct{})
go func() {
for {
// On each iteration new timer is created
select {
case <-time.After(interval):
time.Sleep(time.Second)
log.Println("Do task")
case <-incomeCh:
log.Println("Handle income message and move to the next iteration")
}
}
}()
go func() {
for {
time.Sleep(time.Duration(rand.Intn(MeanArrivalTime)) * time.Second)
// generate incoming message
incomeCh <- struct{}{}
}
}()
// prevent main to stop for a while
<-time.After(10 * time.Second)
}
Note that:
After
waits for the duration to elapse and then sends the current time on the returned channel. It is equivalent toNewTimer(d).C
. The underlying Timer is not recovered by the garbage collector until the timer fires. If efficiency is a concern, useNewTimer
instead and callTimer.Stop
if the timer is no longer needed.
Upvotes: 3
Reputation: 51467
Assume you have:
t.Stop()
t.Reset()
If the timer is stopped and drained before calling Stop
, this works fine. The problem manifests itself if Stop
stops the timer and timer ticks at the same time. Then you may end up with a stopped timer with a goroutine waiting to write to the t.C
channel. So Stop
returns false if there is still a goroutine waiting to write to t.C
, and you have to read from it. Otherwise, you'll have that goroutine waiting there indefinitely.
So, as you already observed, you have to do:
if !t.Stop() {
<-t.C
}
t.Reset(d)
However, even with that, I think your solution is flawed because of the use of asynchronous resets. Instead, try using a new timer for each simulated event.
Upvotes: 1