jmng
jmng

Reputation: 2568

Go pipeline using channels

I'm exploring Go and trying to set up a sort of pipeline using channels. I just want to read something in main() and send them to process() for processing, in this case just print the value to the screen.

Unfortunately, in the code below, it appears that process() never reads from the channel, or at least it doesn't print anything; what am I doing wrong?

package main

import ( "fmt" ; "database/sql" ; _ "github.com/lib/pq" ; "time" ; "gopkg.in/redis.v3" )//; "strconv" )

type Record struct {
    userId, myDate int
    prodUrl string
}


func main(){

    //connect to db
    db, err := sql.Open(...)
    defer db.Close()

    //error check here...

    //exec query
    rows, err := db.Query("select userID,url,date from mytable limit 10")
    defer rows.Close()

    //error check here...   

    //create channel to buffer rows read
    bufferChan := make(chan *Record,1000)
    go process(bufferChan)

    //iterate through results and send them to process()
    row := new(Record)
    for rows.Next(){
        err := rows.Scan(&row.userId, &row.prodUrl, &row.myDate)        
        bufferChan <- row
        fmt.Printf("row sent %v",row.userId)                    
    }   
}

//prints Record values
func process (buffer chan *Record) {
    row := <- buffer
    fmt.Printf("row received: %d %v %d ", row.userId,row.prodUrl,row.myDate)
}

Upvotes: 1

Views: 820

Answers (3)

Arif A.
Arif A.

Reputation: 993

I believe you are looking for io.pipe() go API which creates a synchronous in-memory pipe between a writer and reader/s. There is no buffering here. It can be used to connect code expecting an io.Reader with code expecting an io.Writer.

In your case, io.PipeWriter is the code "reading value from the database" and "io.PipeReader" is the code "writing the value to the screen".

Here, an example of streaming data without any buffer i.e bytes.Buffer.

// Set up the pipe to write data directly into the Reader.
pr, pw := io.Pipe()
// Write JSON-encoded data to the Writer end of the pipe.
// Write in a separate concurrent goroutine, and remember
// to Close the PipeWriter, to signal to the paired PipeReader
// that we’re done writing.
go func() {
  err := json.NewEncoder(pw).Encode(&v)
  pw.Close()
}()
// Send the HTTP request. Whatever is read from the Reader
// will be sent in the request body.
// As data is written to the Writer, it will be available
// to read from the Reader.
resp, err := http.Post(“example.com”, “application/json”, pr)

Reference:

https://medium.com/stupid-gopher-tricks/streaming-data-in-go-without-buffering-3285ddd2a1e5

Upvotes: 1

Grzegorz Żur
Grzegorz Żur

Reputation: 49251

Your main function exits so the whole program ends. It should wait for end of processing. Moreover, process function should loop over channel with range keyword.

Scaffolding for a working solution looks like that:

package main

import "fmt"

func process(input chan int, done chan struct{}) {
    for i := range input {
        fmt.Println(i)
    }
    done <- struct{}{}
}

func main() {
    input := make(chan int)
    done := make(chan struct{})

    go process(input, done)

    for i := 1; i < 10; i++ {
        input <- i
    }
    close(input)

    <-done
}

Playground

Upvotes: 1

Prashant Thakkar
Prashant Thakkar

Reputation: 1403

The reason for func process not printing anything is that you func main exits after the for loop for rows.Next finishes thereby exiting the program. You need to do couple of things.

  1. Add call to close after for loop to indicate end adding message to buffered channel else it can lead to deadlock. So call close(bufferChan)
  2. Use range to iterate over channel in your func process.
  3. Pass an additional channel to process to know when it finishes so that main can wait till process finishes.

Look at the code snippet below for example:

package main

import "fmt"

func main() {
    bufferChan := make(chan int, 1000)
    done := make(chan bool)
    go process(bufferChan, done)
    for i := 0; i < 100; i++ {
        bufferChan <- i
    }
    close(bufferChan)

    select {
    case <-done:
        fmt.Println("Done")
    }

}

func process(c chan int, done chan bool) {
    for s := range c {
        fmt.Println(s)
    }   
    done <- true

}

Upvotes: 2

Related Questions