Gustav Eiman
Gustav Eiman

Reputation: 111

Wait for N items in channel before executing sequentially

So I am very new to go! But I had this idea about something I wanted to try.

I would like to have a go routine that accepts strings from a channel but only after it has received N strings should it execute on them.

I looked around for similar questions or cases but I only found ones where the idea was to execute several routines in parallel and wait to aggregate the result.

I though about the idea of creating an array and just pass it to a routine where the length was sufficient. However I want to keep a certain separation of concerns and control this on the receiving end.

My questions are.

  1. Is this bad practice for some reason?
  2. Is there a better way to do this, what is it?

    func main() {
        ch := make(chan string)
        go func() {
            tasks := []string{}
            for {
                tasks = append(tasks,<- ch)
    
                if len(tasks) < 3 {
                    fmt.Println("Queue still to small")
                }
                if len(tasks) > 3 {
                    for i := 0; i < len(tasks); i++ {
                        fmt.Println(tasks[i])
                    }
                }
            }
        }()
    
        ch <- "Msg 1"
        time.Sleep(time.Second)
        ch <- "Msg 2"
        time.Sleep(time.Second)
        ch <- "Msg 3"
        time.Sleep(time.Second)
        ch <- "Msg 4"
        time.Sleep(time.Second)
    }
    

Edit for simpler more accurate example.

Upvotes: 1

Views: 1875

Answers (2)

poy
poy

Reputation: 10537

Based on a few comments, it looks like what you are looking for is some form of batching.

Batching has a few scenarios when you would want to take the batch and send it along:

  1. Batch size is sufficient size
  2. Enough time has passed and a partial batch should be flushed

Your given example does not account for the second scenario. This can lead to some awkward behavior if you just never flush because you quit getting load.

Therefore I would recommend either looking into a library (e.g., cloudfoundry/go-batching) or simply use channels, a Timer and a select statement.

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    go func() {
        tasks := []string{}
        timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience
        for {
            select {
            case <-timer.C:
                fmt.Println("Flush partial batch due to time")
                flush(tasks)
                tasks = nil
                timer.Reset(time.Second)
            case data := <-ch:
                tasks = append(tasks, data)

                // Reset the timer for each data point so that we only flush
                // partial batches when we stop receiving data.
                if !timer.Stop() {
                    <-timer.C
                }
                timer.Reset(time.Second)

                // Guard clause to for batch size
                if len(tasks) < 3 {
                    fmt.Println("Queue still too small")
                    continue
                }

                flush(tasks)
                tasks = nil // reset tasks
            }
        }
    }()

    ch <- "Msg 1"
    time.Sleep(time.Second)
    ch <- "Msg 2"
    time.Sleep(time.Second)
    ch <- "Msg 3"
    time.Sleep(time.Second)
    ch <- "Msg 4"
    time.Sleep(time.Second)
}

func flush(tasks []string) {
    // Guard against emtpy flushes
    if len(tasks) == 0 {
        return
    }

    fmt.Println("Flush")
    for _, t := range tasks {
        fmt.Println(t)
    }
}

Upvotes: 3

dustinevan
dustinevan

Reputation: 977

I can see how something that batches results could be useful. But it does require a custom solution. There are many ways to solve this problem--I tried using Sync.WaitGroup but it got messy. It seems like using a sync.Mutex to lock the batching function is the best way. But, when mutex is the nicest answer imo that should trigger a recheck of the design because, again imo, it should be the last option.

package main

import (
    "context"
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {

    ctx, canc := context.WithCancel(context.Background())
    acc := NewAccumulator(4, ctx)
    go func() {
        for i := 0; i < 10; i++ {
            acc.Write("hi")
        }
        canc()
    }()

    read := acc.ReadChan()
    for batch := range read {
        fmt.Println(batch)
    }
    fmt.Println("done")
}

type Accumulator struct {
    count    int64
    size     int
    in       chan string
    out      chan []string
    ctx      context.Context
    doneFlag int64
    mu   sync.Mutex
}

func NewAccumulator(size int, parentCtx context.Context) *Accumulator {
    a := &Accumulator{
        size: size,
        in:   make(chan string, size),
        out:  make(chan []string, 1),
        ctx:  parentCtx,
    }

    go func() {
        <-a.ctx.Done()
        atomic.AddInt64(&a.doneFlag, 1)
        close(a.in)
        a.mu.Lock()
        a.batch()
        a.mu.Unlock()
        close(a.out)
    }()
    return a
}

func (a *Accumulator) Write(s string) {
    if atomic.LoadInt64(&a.doneFlag) > 0 {
        panic("write to closed accumulator")
    }
    a.in <- s
    atomic.AddInt64(&a.count, 1)
    a.mu.Lock()
    if atomic.LoadInt64(&a.count) == int64(a.size) {
        a.batch()
    }
    a.mu.Unlock()
}

func (a *Accumulator) batch() {
    batch := make([]string, 0)
    for i := 0; i < a.size; i++ {
        msg := <-a.in
        if msg != "" {
            batch = append(batch, msg)
        }
    }
    fmt.Println("batching", batch)
    a.out <- batch
    atomic.StoreInt64(&a.count, 0)
}

func (a *Accumulator) ReadChan() <-chan []string {
    return a.out
}

It probably would be best of just have a slice that accumulates strings and when that slice reaches some size, then you kick off some processing.

Upvotes: 2

Related Questions