Varun Patro
Varun Patro

Reputation: 2179

Selecting between time interval and length of channel

I'm here to find out the most idiomatic way to do the follow task.

Task:

Write data from a channel to a file.

Problem:

I have a channel ch := make(chan int, 100)

I need to read from the channel and write the values I read from the channel to a file. My question is basically how do I do so given that

  1. If channel ch is full, write the values immediately
  2. If channel ch is not full, write every 5s.

So essentially, data needs to be written to the file at least every 5s (assuming that data will be filled into the channel at least every 5s)

Whats the best way to use select, for and range to do my above task?

Thanks!

Upvotes: 1

Views: 1534

Answers (2)

icza
icza

Reputation: 417692

There is no such "event" as "buffer of channel is full", so you can't detect that [*]. This means you can't idiomatically solve your problem with language primitives using only 1 channel.

[*] Not entirely true: you could detect if the buffer of a channel is full by using select with default case when sending on the channel, but that requires logic from the senders, and repetitive attempts to send.

I would use another channel from which I would receive as values are sent on it, and "redirect", store the values in another channel which has a buffer of 100 as you mentioned. At each redirection you may check if the internal channel's buffer is full, and if so, do an immediate write. If not, continue to monitor the "incoming" channel and a timer channel with a select statement, and if the timer fires, do a "regular" write.

You may use len(chInternal) to check how many elements are in the chInternal channel, and cap(chInternal) to check its capacity. Note that this is "safe" as we are the only goroutine handling the chInternal channel. If there would be multiple goroutines, value returned by len(chInternal) could be outdated by the time we use it to something (e.g. comparing it).

In this solution chInternal (as its name says) is for internal use only. Others should only send values on ch. Note that ch may or may not be a buffered channel, solution works in both cases. However, you may improve efficiency if you also give some buffer to ch (so chances that senders get blocked will be lower).

var (
    chInternal = make(chan int, 100)
    ch         = make(chan int) // You may (should) make this a buffered channel too
)

func main() {
    delay := time.Second * 5
    timer := time.NewTimer(delay)
    for {
        select {
        case v := <-ch:
            chInternal <- v
            if len(chInternal) == cap(chInternal) {
                doWrite() // Buffer is full, we need to write immediately
                timer.Reset(delay)
            }
        case <-timer.C:
            doWrite() // "Regular" write: 5 seconds have passed since last write
            timer.Reset(delay)
        }
    }
}

If an immediate write happens (due to a "buffer full" situation), this solution will time the next "regular" write 5 seconds after this. If you don't want this and you want the 5-second regular writes be independent from the immediate writes, simply do not reset the timer following the immediate write.

An implementation of doWrite() may be as follows:

var f *os.File // Make sure to open file for writing

func doWrite() {
    for {
        select {
        case v := <-chInternal:
            fmt.Fprintf(f, "%d ", v) // Write v to the file
        default: // Stop when no more values in chInternal
            return
        }
    }
}

We can't use for ... range as that only returns when the channel is closed, but our chInternal channel is not closed. So we use a select with a default case so when no more values are in the buffer of chInternal, we return.

Improvements

Using a slice instead of 2nd channel

Since the chInternal channel is only used by us, and only on a single goroutine, we may also choose to use a single []int slice instead of a channel (reading/writing a slice is much faster than a channel).

Showing only the different / changed parts, it could look something like this:

var (
    buf = make([]int, 0, 100)
)

func main() {
    // ...

    for {
        select {
        case v := <-ch:
            buf = append(buf, v)
            if len(buf) == cap(buf) {
            // ...
    }
}

func doWrite() {
    for _, v := range buf {
        fmt.Fprintf(f, "%d ", v) // Write v to the file
    }
    buf = buf[:0] // "Clear" the buffer
}

With multiple goroutines

If we stick to leave chInternal a channel, the doWrite() function may be called on another goroutine to not block the other one, e.g. go doWrite(). Since data to write is read from a channel (chInternal), this requires no further synchronization.

Upvotes: 3

user6169399
user6169399

Reputation:

if you just use 5 seconds write, to increase the file write performance,
you may fill the channel any time you need,
then writer goroutine writes that data to the buffered file,
see this very simple and idiomatic sample without using timer
with just using for...range:

package main

import (
    "bufio"
    "fmt"
    "os"
    "sync"
)

var wg sync.WaitGroup

func WriteToFile(filename string, ch chan int) {
    f, e := os.Create(filename)
    if e != nil {
        panic(e)
    }
    w := bufio.NewWriterSize(f, 4*1024*1024)
    defer wg.Done()
    defer f.Close()
    defer w.Flush()
    for v := range ch {
        fmt.Fprintf(w, "%d ", v)
    }
}

func main() {
    ch := make(chan int, 100)
    wg.Add(1)
    go WriteToFile("file.txt", ch)

    for i := 0; i < 500000; i++ {
        ch <- i // do the job
    }
    close(ch) // Finish the job and close output file
    wg.Wait()
}

and notice the defers order.

and in case of 5 seconds write, you may add one interval timer just to flush the buffer of this file to the disk, like this:

package main

import (
    "bufio"
    "fmt"
    "os"
    "sync"
    "time"
)

var wg sync.WaitGroup

func WriteToFile(filename string, ch chan int) {
    f, e := os.Create(filename)
    if e != nil {
        panic(e)
    }
    w := bufio.NewWriterSize(f, 4*1024*1024)

    ticker := time.NewTicker(5 * time.Second)
    quit := make(chan struct{})
    go func() {
        for {
            select {
            case <-ticker.C:
                if w.Buffered() > 0 {
                    fmt.Println(w.Buffered())
                    w.Flush()
                }
            case <-quit:
                ticker.Stop()
                return
            }
        }
    }()

    defer wg.Done()
    defer f.Close()
    defer w.Flush()
    defer close(quit)
    for v := range ch {
        fmt.Fprintf(w, "%d ", v)
    }
}

func main() {
    ch := make(chan int, 100)
    wg.Add(1)
    go WriteToFile("file.txt", ch)

    for i := 0; i < 25; i++ {
        ch <- i // do the job
        time.Sleep(500 * time.Millisecond)
    }
    close(ch) // Finish the job and close output file
    wg.Wait()
}

here I used time.NewTicker(5 * time.Second) for interval timer with quit channel, you may use time.AfterFunc() or time.Tick() or time.Sleep().

with some optimizations ( removing quit channel):

package main

import (
    "bufio"
    "fmt"
    "os"
    "sync"
    "time"
)

var wg sync.WaitGroup

func WriteToFile(filename string, ch chan int) {
    f, e := os.Create(filename)
    if e != nil {
        panic(e)
    }
    w := bufio.NewWriterSize(f, 4*1024*1024)
    ticker := time.NewTicker(5 * time.Second)
    defer wg.Done()
    defer f.Close()
    defer w.Flush()

    for {
        select {
        case v, ok := <-ch:
            if ok {
                fmt.Fprintf(w, "%d ", v)
            } else {
                fmt.Println("done.")
                ticker.Stop()
                return
            }
        case <-ticker.C:
            if w.Buffered() > 0 {
                fmt.Println(w.Buffered())
                w.Flush()
            }
        }
    }
}
func main() {
    ch := make(chan int, 100)
    wg.Add(1)
    go WriteToFile("file.txt", ch)

    for i := 0; i < 25; i++ {
        ch <- i // do the job
        time.Sleep(500 * time.Millisecond)
    }
    close(ch) // Finish the job and close output file
    wg.Wait()
}

I hope this helps.

Upvotes: 0

Related Questions