lightweight
lightweight

Reputation: 3327

using channels with google pubsub poll subscriber

I'm trying to create a google pubsub subscriber in golang where I take 100 messages at at a time and then write them to influx. I'm trying to use channels to do this like this:

package main

import (
    "os"
    "fmt"
    "cloud.google.com/go/pubsub"
    "log"
    "sync"
    "golang.org/x/net/context"
    "encoding/json"
    clnt "github.com/influxdata/influxdb/client/v2"
    "time"
)

type SensorData struct {
    Pressure      float64 `json:"pressure"`
    Temperature   float64 `json:"temperature"`
    Dewpoint      float64 `json:"dewpoint"`
    Timecollected int64   `json:"timecollected"`
    Latitude      float64 `json:"latitude"`
    Longitude     float64 `json:"longitude"`
    Humidity      float64 `json:"humidity"`
    SensorID      string  `json:"sensorId"`
    Zipcode       int     `json:"zipcode"`
    Warehouse     string  `json:"warehouse"`
    Area          string  `json:"area"`
}

type SensorPoints struct {
    SensorData      []SensorData
}

func main () {

    messages := make(chan SensorData, 100)

    // Create a new Influx HTTPClient
    c, err := clnt.NewHTTPClient(clnt.HTTPConfig{
        Addr:     "http://localhost:8086",
        Username: "user",
        Password: "pass",
    })
    if err != nil {
        log.Fatal(err)
    }


    // Create pubsub subscriber
    ctx := context.Background()
    proj := os.Getenv("GOOGLE_CLOUD_PROJECT")
    if proj == "" {
        fmt.Fprintf(os.Stderr, "GOOGLE_CLOUD_PROJECT environment variable must be set.\n")
        os.Exit(1)
    }
    client, err := pubsub.NewClient(ctx, proj)
    if err != nil {
        log.Fatalf("Could not create pubsub Client: %v", err)
    }
    const sub = "influxwriter"


    //create influx a blank batchpoint set
    bp, err := clnt.NewBatchPoints(clnt.BatchPointsConfig{
        Database:  "sensordata",
        Precision: "s",
    })
    if err != nil {
        log.Fatal(err)
    }



    // Pull messages via the subscription.
    go pullMsgs(client, sub, messages)
    if err != nil {
        log.Fatal(err)
    }

    writeInflux(messages, bp)

    c.Close()

}


func pullMsgs(client *pubsub.Client, name string, messages chan<- SensorData) {
    ctx := context.Background()

    // [START pubsub_subscriber_async_pull]
    // [START pubsub_quickstart_subscriber]
    // Consume 10 messages.

    var mu sync.Mutex
    var sensorinfos SensorPoints
    sensorinfo := &SensorData{}

    received := 0
    sub := client.Subscription(name)
    cctx, _ := context.WithCancel(ctx)
    err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
        msg.Ack()

        json.Unmarshal(msg.Data, sensorinfo)

        //fmt.Println(string(msg.Data))
        //fmt.Println(sensorinfo.SensorID)

        sensorinfos.SensorData = append(sensorinfos.SensorData, *sensorinfo)

        mu.Lock()
        defer mu.Unlock()
        received++
        fmt.Println("rcv: ", received)
        messages <- *sensorinfo

    })
    if err != nil {
        fmt.Println(err)
    }
    // [END pubsub_subscriber_async_pull]
    // [END pubsub_quickstart_subscriber]
}

func writeInflux(sensorpoints <- chan SensorData, bp clnt.BatchPoints) {

    for p := range sensorpoints {

        // Create a point and add to batch
        tags := map[string]string{
            "sensorId": p.SensorID,
            "warehouse": p.Warehouse,
            "area": p.Area,
            "zipcode": string(p.Zipcode),
        }

        fields := map[string]interface{}{
            "pressure":   p.Pressure,
            "humidity": p.Humidity,
            "temperature":   p.Temperature,
            "dewpoint":   p.Dewpoint,
            "longitude":   p.Longitude,
            "latitude":   p.Latitude,
        }

        pt, err := clnt.NewPoint("sensordata", tags, fields, time.Unix(p.Timecollected, 0))
        if err != nil {
            log.Fatal(err)
        }
        bp.AddPoint(pt)


    }


}

but it doesn't see to every get past the initial pullMsgs function and just keeps printing the output in there:

rcv:  1
rcv:  2
rcv:  3
rcv:  4
rcv:  5
rcv:  6
rcv:  7

I thought that once the channel get full, it should block until the channel is emptied out

this is the pubsub pull code I'm using as a reference.

Upvotes: 0

Views: 1340

Answers (1)

jrefior
jrefior

Reputation: 4421

When you've sent the desired number of messages on the channel, close the channel and cancel the context. Try using the technique demonstrated in the documentation of canceling after some number of messages. Since your buffer is 100 and you're trying to consume 100 messages at a time, that's the number. If you want your program to exit, close the channel so that the for e := range ch loop in writeInflux hits a stopping point and doesn't block waiting for more elements to be added to the channel.

Note this in the Go pubsub API doc:

To terminate a call to Receive, cancel its context.

That is not what's stalling your main goroutine, but your pullMsgs goroutine will not exit on its own without that cancel.

Also, check for errors on Unmarshal. If you don't want to handle unmarshal errors at this point in the code, consider changing the channel type and sending msg or msg.Data instead and unmarshaling upon channel receipt.

cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
    msg.Ack()
    err := json.Unmarshal(msg.Data, sensorinfo)
    if err != nil {
         fmt.Printf("Failed to unmarshal: %s\n", err)
    }
    mu.Lock()
    defer mu.Unlock()
    received++
    fmt.Println("rcv: ", received)
    messages <- *sensorinfo
    if received == 100 {
        close(messages)  // no more messages will be sent on channel
        cancel()
    }

Upvotes: 1

Related Questions