Reputation: 2568
I'm exploring Go and trying to set up a sort of pipeline using channels. I just want to read something in main() and send them to process() for processing, in this case just print the value to the screen.
Unfortunately, in the code below, it appears that process() never reads from the channel, or at least it doesn't print anything; what am I doing wrong?
package main
import ( "fmt" ; "database/sql" ; _ "github.com/lib/pq" ; "time" ; "gopkg.in/redis.v3" )//; "strconv" )
type Record struct {
userId, myDate int
prodUrl string
}
func main(){
//connect to db
db, err := sql.Open(...)
defer db.Close()
//error check here...
//exec query
rows, err := db.Query("select userID,url,date from mytable limit 10")
defer rows.Close()
//error check here...
//create channel to buffer rows read
bufferChan := make(chan *Record,1000)
go process(bufferChan)
//iterate through results and send them to process()
row := new(Record)
for rows.Next(){
err := rows.Scan(&row.userId, &row.prodUrl, &row.myDate)
bufferChan <- row
fmt.Printf("row sent %v",row.userId)
}
}
//prints Record values
func process (buffer chan *Record) {
row := <- buffer
fmt.Printf("row received: %d %v %d ", row.userId,row.prodUrl,row.myDate)
}
Upvotes: 1
Views: 820
Reputation: 993
I believe you are looking for io.pipe()
go API which creates a synchronous in-memory pipe between a writer and reader/s. There is no buffering here. It can be used to connect code expecting an io.Reader
with code expecting an io.Writer
.
In your case, io.PipeWriter
is the code "reading value from the database" and "io.PipeReader" is the code "writing the value to the screen".
Here, an example of streaming data without any buffer i.e bytes.Buffer
.
// Set up the pipe to write data directly into the Reader.
pr, pw := io.Pipe()
// Write JSON-encoded data to the Writer end of the pipe.
// Write in a separate concurrent goroutine, and remember
// to Close the PipeWriter, to signal to the paired PipeReader
// that we’re done writing.
go func() {
err := json.NewEncoder(pw).Encode(&v)
pw.Close()
}()
// Send the HTTP request. Whatever is read from the Reader
// will be sent in the request body.
// As data is written to the Writer, it will be available
// to read from the Reader.
resp, err := http.Post(“example.com”, “application/json”, pr)
Reference:
https://medium.com/stupid-gopher-tricks/streaming-data-in-go-without-buffering-3285ddd2a1e5
Upvotes: 1
Reputation: 49251
Your main function exits so the whole program ends. It should wait for end of processing. Moreover, process function should loop over channel with range keyword.
Scaffolding for a working solution looks like that:
package main
import "fmt"
func process(input chan int, done chan struct{}) {
for i := range input {
fmt.Println(i)
}
done <- struct{}{}
}
func main() {
input := make(chan int)
done := make(chan struct{})
go process(input, done)
for i := 1; i < 10; i++ {
input <- i
}
close(input)
<-done
}
Upvotes: 1
Reputation: 1403
The reason for func process not printing anything is that you func main exits after the for loop for rows.Next finishes thereby exiting the program. You need to do couple of things.
Look at the code snippet below for example:
package main
import "fmt"
func main() {
bufferChan := make(chan int, 1000)
done := make(chan bool)
go process(bufferChan, done)
for i := 0; i < 100; i++ {
bufferChan <- i
}
close(bufferChan)
select {
case <-done:
fmt.Println("Done")
}
}
func process(c chan int, done chan bool) {
for s := range c {
fmt.Println(s)
}
done <- true
}
Upvotes: 2