Reputation: 3327
I'm trying to create a google pubsub subscriber in golang where I take 100 messages at at a time and then write them to influx. I'm trying to use channels to do this like this:
package main
import (
"os"
"fmt"
"cloud.google.com/go/pubsub"
"log"
"sync"
"golang.org/x/net/context"
"encoding/json"
clnt "github.com/influxdata/influxdb/client/v2"
"time"
)
type SensorData struct {
Pressure float64 `json:"pressure"`
Temperature float64 `json:"temperature"`
Dewpoint float64 `json:"dewpoint"`
Timecollected int64 `json:"timecollected"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
Humidity float64 `json:"humidity"`
SensorID string `json:"sensorId"`
Zipcode int `json:"zipcode"`
Warehouse string `json:"warehouse"`
Area string `json:"area"`
}
type SensorPoints struct {
SensorData []SensorData
}
func main () {
messages := make(chan SensorData, 100)
// Create a new Influx HTTPClient
c, err := clnt.NewHTTPClient(clnt.HTTPConfig{
Addr: "http://localhost:8086",
Username: "user",
Password: "pass",
})
if err != nil {
log.Fatal(err)
}
// Create pubsub subscriber
ctx := context.Background()
proj := os.Getenv("GOOGLE_CLOUD_PROJECT")
if proj == "" {
fmt.Fprintf(os.Stderr, "GOOGLE_CLOUD_PROJECT environment variable must be set.\n")
os.Exit(1)
}
client, err := pubsub.NewClient(ctx, proj)
if err != nil {
log.Fatalf("Could not create pubsub Client: %v", err)
}
const sub = "influxwriter"
//create influx a blank batchpoint set
bp, err := clnt.NewBatchPoints(clnt.BatchPointsConfig{
Database: "sensordata",
Precision: "s",
})
if err != nil {
log.Fatal(err)
}
// Pull messages via the subscription.
go pullMsgs(client, sub, messages)
if err != nil {
log.Fatal(err)
}
writeInflux(messages, bp)
c.Close()
}
func pullMsgs(client *pubsub.Client, name string, messages chan<- SensorData) {
ctx := context.Background()
// [START pubsub_subscriber_async_pull]
// [START pubsub_quickstart_subscriber]
// Consume 10 messages.
var mu sync.Mutex
var sensorinfos SensorPoints
sensorinfo := &SensorData{}
received := 0
sub := client.Subscription(name)
cctx, _ := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
msg.Ack()
json.Unmarshal(msg.Data, sensorinfo)
//fmt.Println(string(msg.Data))
//fmt.Println(sensorinfo.SensorID)
sensorinfos.SensorData = append(sensorinfos.SensorData, *sensorinfo)
mu.Lock()
defer mu.Unlock()
received++
fmt.Println("rcv: ", received)
messages <- *sensorinfo
})
if err != nil {
fmt.Println(err)
}
// [END pubsub_subscriber_async_pull]
// [END pubsub_quickstart_subscriber]
}
func writeInflux(sensorpoints <- chan SensorData, bp clnt.BatchPoints) {
for p := range sensorpoints {
// Create a point and add to batch
tags := map[string]string{
"sensorId": p.SensorID,
"warehouse": p.Warehouse,
"area": p.Area,
"zipcode": string(p.Zipcode),
}
fields := map[string]interface{}{
"pressure": p.Pressure,
"humidity": p.Humidity,
"temperature": p.Temperature,
"dewpoint": p.Dewpoint,
"longitude": p.Longitude,
"latitude": p.Latitude,
}
pt, err := clnt.NewPoint("sensordata", tags, fields, time.Unix(p.Timecollected, 0))
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)
}
}
but it doesn't see to every get past the initial pullMsgs
function and just keeps printing the output in there:
rcv: 1
rcv: 2
rcv: 3
rcv: 4
rcv: 5
rcv: 6
rcv: 7
I thought that once the channel get full, it should block until the channel is emptied out
this is the pubsub pull code I'm using as a reference.
Upvotes: 0
Views: 1340
Reputation: 4421
When you've sent the desired number of messages on the channel, close the channel and cancel the context. Try using the technique demonstrated in the documentation of canceling after some number of messages. Since your buffer is 100 and you're trying to consume 100 messages at a time, that's the number. If you want your program to exit, close the channel so that the for e := range ch
loop in writeInflux
hits a stopping point and doesn't block waiting for more elements to be added to the channel.
Note this in the Go pubsub API doc:
To terminate a call to Receive, cancel its context.
That is not what's stalling your main goroutine, but your pullMsgs
goroutine will not exit on its own without that cancel.
Also, check for errors on Unmarshal
. If you don't want to handle unmarshal errors at this point in the code, consider changing the channel type and sending msg
or msg.Data
instead and unmarshaling upon channel receipt.
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
msg.Ack()
err := json.Unmarshal(msg.Data, sensorinfo)
if err != nil {
fmt.Printf("Failed to unmarshal: %s\n", err)
}
mu.Lock()
defer mu.Unlock()
received++
fmt.Println("rcv: ", received)
messages <- *sensorinfo
if received == 100 {
close(messages) // no more messages will be sent on channel
cancel()
}
Upvotes: 1