Reputation: 7669
I apologise for the very simple question, just having some trouble wrapping my head around how we implement io.Reader
.
My final use case is that I am consuming an endless stream that can send data at any time. To simulate this I have created an emitter
that implements io.Reader
and io.Writer
.
I listen to the reader using a bufio.Scanner
concurrently, while sending values to any listener from main.
Playground: https://goplay.space/#eJfe0HyfYrL
func main() {
wg := &sync.WaitGroup{}
wg.Add(1)
data := newEmitter()
go func() {
scanner := bufio.NewScanner(data)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
wg.Done()
}()
data.WriteString("foo")
time.Sleep(2 * time.Second)
data.WriteString("bar")
wg.Wait()
}
My emitter
type emitter struct {
ch chan []byte
}
func (em *emitter) Read(b []byte) (int, error) {
n := copy(b, <-em.ch)
return n, nil
}
func (em *emitter) Write(b []byte) (int, error) {
em.ch <- b
return len(b), nil
}
func (em *emitter) WriteString(s string) (int, error) {
return em.Write([]byte(s))
}
func newEmitter() *emitter {
return &emitter{
ch: make(chan []byte),
}
}
I get the following error
fatal error: all goroutines are asleep - deadlock!
Upvotes: 0
Views: 2048
Reputation: 121009
The goroutine blocks in emitter.Read
on receive from <-em.ch
. The main goroutine blocks on wg.Wait
. Deadlock!
Fix by adding a Close()
method to emitter to close em.ch
. Return io.EOF
in emitter.Read
when the channel is closed. Close
the emitter when done sending data.
There are other issues with the emitter
. Instead of fixing emitter
, use io.Pipe to connect the writer to the reader:
wg := &sync.WaitGroup{}
wg.Add(1)
pr, pw := io.Pipe()
go func() {
scanner := bufio.NewScanner(pr)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
wg.Done()
}()
io.WriteString(pw, "foo\n")
time.Sleep(2 * time.Second)
io.WriteString(pw, "bar\n")
pw.Close()
wg.Wait()
Upvotes: 3