Reputation: 2638
I am new to go, but have worked with concurrency before. I am having an issue sharing a slice between multiple goroutines not containing the same data between all of the goroutines. I use a mutex as well to lock the struct when I modify the slice, but it doesn't seem to help. I have attached my code and would like to know what I am doing wrong, thanks for any help!
type State struct {
waiting int32
processing int32
completed int32
}
type Scheduler struct {
sync.Mutex
items chan interface{}
backPressure []interface{}
capacity int
canceler context.CancelFunc
state State
}
func NewScheduler(capacity int, handler func(interface {}) (interface{}, error)) Scheduler {
ctx, cancel := context.WithCancel(context.Background())
state := State{}
atomic.StoreInt32(&state.waiting, 0)
atomic.StoreInt32(&state.processing, 0)
atomic.StoreInt32(&state.completed, 0)
scheduler := Scheduler{
items: make(chan interface{}, capacity),
backPressure: make([]interface{}, 0),
capacity: capacity,
canceler: cancel,
state: state,
}
scheduler.initializeWorkers(ctx, handler)
return scheduler
}
func (s *Scheduler) initializeWorkers(ctx context.Context, handler func(interface {}) (interface{}, error)) {
for i := 0; i < 5; i++ {
go s.newWorker(ctx, handler)
}
}
func (s *Scheduler) newWorker(ctx context.Context, handler func(interface {}) (interface{}, error)) {
backoff := 0
for {
select {
case <-ctx.Done():
return
case job := <- s.items:
atomic.AddInt32(&s.state.waiting, -1)
atomic.AddInt32(&s.state.processing, 1)
job, _ = handler(job)
backoff = 0
atomic.AddInt32(&s.state.processing, -1)
atomic.AddInt32(&s.state.completed, 1)
default:
backoff += 1
s.CheckBackPressure()
time.Sleep(time.Duration(backoff * 10) * time.Millisecond)
}
}
}
func (s *Scheduler) AddItem(item interface{}) {
atomic.AddInt32(&s.state.waiting, 1)
if len(s.items) < s.capacity {
select {
case s.items <- item:
return
}
}
s.Lock()
defer s.Unlock()
s.backPressure = append(s.backPressure, item)
fmt.Printf("new backpressure len %v \n", len(s.backPressure))
return
}
func (s *Scheduler) Process() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
if atomic.LoadInt32(&s.state.waiting) == 0 && atomic.LoadInt32(&s.state.processing) == 0 {
return
}
runtime.Gosched()
}
}()
wg.Wait()
}
func (s *Scheduler) CheckBackPressure() {
s.Lock()
defer s.Unlock()
if len(s.backPressure) == 0 || s.capacity <= len(s.items) {
fmt.Printf("backpressure = %d :: len = %d cap = %d \n", len(s.backPressure), len(s.items), s.capacity)
return
}
fmt.Printf("releasing backpressure \n")
job, tmp := s.backPressure[0], s.backPressure[1:]
s.backPressure = tmp
s.items <- job
return
}
func (s *Scheduler) Stop() {
s.canceler()
}
This is the code that I am using to test the functionality:
type Job struct {
Value int
}
func TestSchedulerExceedingCapacity(t *testing.T) {
handler := func (ptr interface{}) (interface{}, error) {
job, ok := (ptr).(*Job)
if ok != true {
return nil, errors.New("failed to convert job")
}
// simulate work
time.Sleep(50 * time.Millisecond)
return job, nil
}
scheduler := NewScheduler(5, handler)
for i := 0; i < 25; i++ {
scheduler.AddItem(&(Job { Value: i }))
}
fmt.Printf("PROCESSING\n")
scheduler.Process()
fmt.Printf("FINISHED\n")
}
When I update the slice that holds the back pressure, it seems to indicate that it was correctly updated by printing new backpressure len 1
for 1-16.
However, when I check the back pressure from the worker, it indicates that the backpressure slice is empty. backpressure = 0 :: len = 0 cap = 5
.
Also "releasing backpressure" is never printed to stdout either.
Here is some additional output...
=== RUN TestSchedulerExceedingCapacity
new backpressure len 1
new backpressure len 2
new backpressure len 3
new backpressure len 4
new backpressure len 5
new backpressure len 6
new backpressure len 7
new backpressure len 8
backpressure = 0 :: len = 0 cap = 5
new backpressure len 9
new backpressure len 10
new backpressure len 11
new backpressure len 12
new backpressure len 13
new backpressure len 14
new backpressure len 15
new backpressure len 16
PROCESSING
backpressure = 0 :: len = 0 cap = 5
backpressure = 0 :: len = 0 cap = 5
backpressure = 0 :: len = 0 cap = 5
...
If I don't kill the test it indefinitely prints backpressure = 0 :: len = 0 cap = 5
I am assuming that I am not correctly synchronizing the changes, I would REALLY appreciate any insights, thanks!
Upvotes: 3
Views: 99
Reputation: 2638
Okay I was able to figure this out of course once I posted the question...
I saw somewhere that suggested to run the test with the -race
option which enables the data race detector. I immediately got errors which helped make the problem easier to debug.
It turns out that the problem was related to returning the value of NewScheduler
rather than the pointer of new scheduler. I changed that function to the following code, which fixed the issue.
func NewScheduler(capacity int, handler func(interface {}) (interface{}, error)) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
state := State{}
atomic.StoreInt32(&state.waiting, 0)
atomic.StoreInt32(&state.processing, 0)
atomic.StoreInt32(&state.completed, 0)
atomic.StoreInt32(&state.errors, 0)
scheduler := Scheduler{
items: make(chan interface{}, capacity),
backPressure: make([]interface{}, 0),
capacity: capacity,
canceler: cancel,
state: state,
}
scheduler.initializeWorkers(ctx, handler)
return &scheduler
}
Upvotes: 3