Reputation: 315
I'm trying to use the Go Beam Sdk to create a pipeline processing pubsub messages.
github.com/apache/beam/sdks/v2/go/pkg/beam
I understand that the pubsubio connector is doing external calls working only on dataflow runner.
What if I want to test my pipeline locally ? How would you do that ?
I need to understand what is preventing me to write my own pubsub unbounded source ? (I may not understand how Beam works under the hood, like how does it serialize user defined code to send it to the runner ?)
Tried to do something like that:
package pubsubio
import (
"context"
"fmt"
cloud_pubsub "cloud.google.com/go/pubsub"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
)
func init() {
register.DoFn3x1[context.Context, string, func(*cloud_pubsub.Message), error](&readFn{})
register.Emitter1[*cloud_pubsub.Message]()
}
type ReadConfig struct {
ProjectID string
TopicName string
SubscriptionName string
}
func Read(
scope beam.Scope,
cfg ReadConfig,
) beam.PCollection {
scope = scope.Scope("pubsubio.Read")
col := beam.Create(scope, cfg.SubscriptionName)
return beam.ParDo(scope, newReadFn(cfg.ProjectID, cfg.TopicName), col)
}
type readFn struct {
pubsubFn
TopicName string
}
func newReadFn(projectID, topicName string) *readFn {
return &readFn{
pubsubFn: pubsubFn{
ProjectID: projectID,
},
TopicName: topicName,
}
}
func (fn *readFn) ProcessElement(
ctx context.Context,
subscriptionName string,
emit func(message *cloud_pubsub.Message),
) error {
log.Info(ctx, "[pubsubio.ProcessElement] Reading from pubsub")
_, err := pubsubx.EnsureTopic(ctx, fn.client, fn.TopicName)
if err != nil {
return fmt.Errorf("cannot get topic: %w", err)
}
sub, err := pubsubx.EnsureSubscription(ctx, fn.client, fn.TopicName, subscriptionName)
if err != nil {
return fmt.Errorf("cannot get subscription: %w", err)
}
return sub.Receive(ctx, func(ctx context.Context, message *cloud_pubsub.Message) {
emit(message)
log.Debugf(ctx, "[pubsubio.ProcessElement] Emit msg: %s", message.ID)
message.Ack()
})
}
So basically I created a Read fn that never return, but the rest of my pipeline is never triggered (I must miss something)
Upvotes: 2
Views: 126