Reputation: 111
So I am very new to go! But I had this idea about something I wanted to try.
I would like to have a go routine that accepts strings from a channel but only after it has received N strings should it execute on them.
I looked around for similar questions or cases but I only found ones where the idea was to execute several routines in parallel and wait to aggregate the result.
I though about the idea of creating an array and just pass it to a routine where the length was sufficient. However I want to keep a certain separation of concerns and control this on the receiving end.
My questions are.
Is there a better way to do this, what is it?
func main() {
ch := make(chan string)
go func() {
tasks := []string{}
for {
tasks = append(tasks,<- ch)
if len(tasks) < 3 {
fmt.Println("Queue still to small")
}
if len(tasks) > 3 {
for i := 0; i < len(tasks); i++ {
fmt.Println(tasks[i])
}
}
}
}()
ch <- "Msg 1"
time.Sleep(time.Second)
ch <- "Msg 2"
time.Sleep(time.Second)
ch <- "Msg 3"
time.Sleep(time.Second)
ch <- "Msg 4"
time.Sleep(time.Second)
}
Edit for simpler more accurate example.
Upvotes: 1
Views: 1875
Reputation: 10537
Based on a few comments, it looks like what you are looking for is some form of batching.
Batching has a few scenarios when you would want to take the batch and send it along:
Your given example does not account for the second scenario. This can lead to some awkward behavior if you just never flush because you quit getting load.
Therefore I would recommend either looking into a library (e.g., cloudfoundry/go-batching) or simply use channels, a Timer and a select statement.
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
tasks := []string{}
timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience
for {
select {
case <-timer.C:
fmt.Println("Flush partial batch due to time")
flush(tasks)
tasks = nil
timer.Reset(time.Second)
case data := <-ch:
tasks = append(tasks, data)
// Reset the timer for each data point so that we only flush
// partial batches when we stop receiving data.
if !timer.Stop() {
<-timer.C
}
timer.Reset(time.Second)
// Guard clause to for batch size
if len(tasks) < 3 {
fmt.Println("Queue still too small")
continue
}
flush(tasks)
tasks = nil // reset tasks
}
}
}()
ch <- "Msg 1"
time.Sleep(time.Second)
ch <- "Msg 2"
time.Sleep(time.Second)
ch <- "Msg 3"
time.Sleep(time.Second)
ch <- "Msg 4"
time.Sleep(time.Second)
}
func flush(tasks []string) {
// Guard against emtpy flushes
if len(tasks) == 0 {
return
}
fmt.Println("Flush")
for _, t := range tasks {
fmt.Println(t)
}
}
Upvotes: 3
Reputation: 977
I can see how something that batches results could be useful. But it does require a custom solution. There are many ways to solve this problem--I tried using Sync.WaitGroup
but it got messy. It seems like using a sync.Mutex
to lock the batching function is the best way. But, when mutex is the nicest answer imo that should trigger a recheck of the design because, again imo, it should be the last option.
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
)
func main() {
ctx, canc := context.WithCancel(context.Background())
acc := NewAccumulator(4, ctx)
go func() {
for i := 0; i < 10; i++ {
acc.Write("hi")
}
canc()
}()
read := acc.ReadChan()
for batch := range read {
fmt.Println(batch)
}
fmt.Println("done")
}
type Accumulator struct {
count int64
size int
in chan string
out chan []string
ctx context.Context
doneFlag int64
mu sync.Mutex
}
func NewAccumulator(size int, parentCtx context.Context) *Accumulator {
a := &Accumulator{
size: size,
in: make(chan string, size),
out: make(chan []string, 1),
ctx: parentCtx,
}
go func() {
<-a.ctx.Done()
atomic.AddInt64(&a.doneFlag, 1)
close(a.in)
a.mu.Lock()
a.batch()
a.mu.Unlock()
close(a.out)
}()
return a
}
func (a *Accumulator) Write(s string) {
if atomic.LoadInt64(&a.doneFlag) > 0 {
panic("write to closed accumulator")
}
a.in <- s
atomic.AddInt64(&a.count, 1)
a.mu.Lock()
if atomic.LoadInt64(&a.count) == int64(a.size) {
a.batch()
}
a.mu.Unlock()
}
func (a *Accumulator) batch() {
batch := make([]string, 0)
for i := 0; i < a.size; i++ {
msg := <-a.in
if msg != "" {
batch = append(batch, msg)
}
}
fmt.Println("batching", batch)
a.out <- batch
atomic.StoreInt64(&a.count, 0)
}
func (a *Accumulator) ReadChan() <-chan []string {
return a.out
}
It probably would be best of just have a slice that accumulates strings and when that slice reaches some size, then you kick off some processing.
Upvotes: 2