Reputation: 41
I hope everyone's Friday is going well. I am desperately looking for some help with apache beam Go SDK (https://github.com/apache/beam/tree/master/sdks). I've written a pipeline using it to process PubSub events and got into a stage where my workers are starting nicely, but no messages are being consumed from pubsub. I've tried to run the example provided in the SDK (streaming_wordcap) that's using the same pubsubio and the result is the same. No messages in the newly created topics are being consumed. I wonder if there is an extra option that I should be enabling? Any deployment-specific flag? I am a little bit lost now.
There are messages in the subscription (a few million). When performed an experiment and changed the subscription name to something that doesn't exist I have seen errors in dataflow logs. Otherwise no errors, no info apart from generic dataflow debug.
2022-07-08T11:21:31.793474125ZStarting 3 workers in europe-west4-a...
Debug
2022-07-08T11:21:31.820662575ZStarting worker pool setup.
Debug
2022-07-08T11:22:00.789962383ZAutoscaling: Raised the number of workers to 3 so that the pipeline can catch up with its backlog and keep up with its input rate.
Debug
2022-07-08T11:22:50.806937837ZWorkers have started successfully.
Here is a part of my pipeline code:
var (
inputTopic = flag.String("topic", "", "PubSub input topic (required).")
inputSubscription = flag.String("inputSubscription", "", "PubSub input subscription (required).")
outputTableSpec = flag.String("outputTableSpec", "", "Output BQ table (required).")
)
func init() {
beam.RegisterType(reflect.TypeOf((*event.Envelope)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*decodeEnvelopeJSONFunc)(nil)).Elem())
[...]
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
if err := validateFlags(); err != nil {
log.Exit(ctx, err.Error())
}
project := gcpopts.GetProject(ctx)
p, s := beam.NewPipelineWithRoot()
pubSubMessages := pubsubio.Read(s, project, *inputTopic, &pubsubio.ReadOptions{
Subscription: *inputSubscription, WithAttributes: false, IDAttribute: "", TimestampAttribute: "",
})
eventMapper := DecodeAndMap(s, pubSubMessages)
bigqueryio.Write(s, project, *outputTableSpec, eventMapper)
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "failed to execute job: %v", err)
}
}
func DecodeAndMap(s beam.Scope, messages beam.PCollection) beam.PCollection {
s = s.Scope("DecodeAndMap")
events := beam.ParDo(s, &decodeEnvelopeJSONFunc{}, messages)
return beam.ParDo(s, &mapPayloadFunc{}, events)
}
type decodeEnvelopeJSONFunc struct{}
func (f *decodeEnvelopeJSONFunc) ProcessElement(ctx context.Context, msg []byte, emit func(*event.Envelope)) error {
var e event.Envelope
log.Infoln(ctx, "decoding envelope")
if err := json.NewDecoder(bytes.NewReader(msg)).Decode(&e); err != nil {
return fmt.Errorf("failed to decode envelope: %w", err)
}
log.Infoln(ctx, "emitting envelope")
emit(&e)
return nil
}
[...]
Here is how I am deploying my pipeline
go run ./pkg/my-mapper/. \
--runner dataflow \
--job_name my-mapper \
--project mb-gcp-project \
--region europe-west4 --zone europe-west4-a \
--temp_location gs://my-beam-tmp-data-bucket/tmp/ \
--staging_location gs://my-beam-tmp-data-bucket/binaries/ \
--worker_harness_container_image=apache/beam_go_sdk:latest \
--subnetwork regions/europe-west4/subnetworks/my-subnetwork \
--num_workers 3 \
--max_num_workers 10 \
--async --update \
--topic my-topic-name --inputSubscription my-sub-name --outputTableSpec my-gcp-project:my_dataset.mapped_data
2022/07/08 12:16:33 Cross-compiling ... as /tmp/worker-1-1657278993706049280
[...]
"type": "JOB_TYPE_STREAMING"
}
2022/07/08 12:20:11 Submitted job: 2022-07-08_04_20_11-11918574995509384496
Upvotes: 4
Views: 689
Reputation: 111
This looks like a windowing problem. It's a streaming pipeline that isn't windowing it's data before writing out, which means it's trying to aggregate in the Default Global Window, and never terminate or Ack the messages.
Add a beam.WindowInto step to some granularity after the PubSub step. Either with some windowing, or Triggers. See the Programming Guide on Windowing and Triggers for more info: https://beam.apache.org/documentation/programming-guide/#windowing
This will allow any aggretation operation in the bigqueryio.Write step to actually complete, regardless of if it's the "xlang" version or not.
At this time, I'd recommend using the xlang version, as that has been validated for Streaming use. The Go native one hasn't been tested and vetted to work properly for Streaming writes, and there could be issues as a result.
Eg. If you're using the native bigqueryIO, your pipeline is getting held up here, as the pipeline is waiting for the end of the global window. https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/bigqueryio/bigquery.go#L207
Otherwise, here are some things to look into though:
event.Envelope
are all Exported. Otherwise they won't be serialized by any of either JSON or the beam native Schema encoding. I can't see this being the root problem however.worker_harness_container_image
flag, and infact that could cause worker startup issues. While historically the Beam Go container hasn't changed much, that's no longer the case as additional features are being added, which require coordination between the container bootloader and the worker harness code.Cheers!
Upvotes: 1