David Alsh
David Alsh

Reputation: 7669

When trying to implement an io.Reader that uses channels I get a fatal error

I apologise for the very simple question, just having some trouble wrapping my head around how we implement io.Reader.

My final use case is that I am consuming an endless stream that can send data at any time. To simulate this I have created an emitter that implements io.Reader and io.Writer.

I listen to the reader using a bufio.Scanner concurrently, while sending values to any listener from main.

Playground: https://goplay.space/#eJfe0HyfYrL


func main() {
    wg := &sync.WaitGroup{}
    wg.Add(1)

    data := newEmitter()

    go func() {
        scanner := bufio.NewScanner(data)
        for scanner.Scan() {
            fmt.Println(scanner.Text())
        }
        wg.Done()
    }()

    data.WriteString("foo")
    time.Sleep(2 * time.Second)
    data.WriteString("bar")

    wg.Wait()
}

My emitter

type emitter struct {
    ch chan []byte
}

func (em *emitter) Read(b []byte) (int, error) {
    n := copy(b, <-em.ch)
    return n, nil
}

func (em *emitter) Write(b []byte) (int, error) {
    em.ch <- b
    return len(b), nil
}

func (em *emitter) WriteString(s string) (int, error) {
    return em.Write([]byte(s))
}

func newEmitter() *emitter {
    return &emitter{
        ch: make(chan []byte),
    }
}

I get the following error

fatal error: all goroutines are asleep - deadlock!

Upvotes: 0

Views: 2048

Answers (1)

Thundercat
Thundercat

Reputation: 121009

The goroutine blocks in emitter.Read on receive from <-em.ch. The main goroutine blocks on wg.Wait. Deadlock!

Fix by adding a Close() method to emitter to close em.ch. Return io.EOF in emitter.Read when the channel is closed. Close the emitter when done sending data.

There are other issues with the emitter. Instead of fixing emitter, use io.Pipe to connect the writer to the reader:

wg := &sync.WaitGroup{}
wg.Add(1)

pr, pw := io.Pipe()

go func() {
    scanner := bufio.NewScanner(pr)
    for scanner.Scan() {
        fmt.Println(scanner.Text())
    }
    wg.Done()
}()

io.WriteString(pw, "foo\n")
time.Sleep(2 * time.Second)
io.WriteString(pw, "bar\n")
pw.Close()

wg.Wait()

Run it on the playground.

Upvotes: 3

Related Questions