Michał Suchwałko
Michał Suchwałko

Reputation: 41

Apache Beam / Dataflow GoSDK Pipeline doesn't process any pubsub messages

I hope everyone's Friday is going well. I am desperately looking for some help with apache beam Go SDK (https://github.com/apache/beam/tree/master/sdks). I've written a pipeline using it to process PubSub events and got into a stage where my workers are starting nicely, but no messages are being consumed from pubsub. I've tried to run the example provided in the SDK (streaming_wordcap) that's using the same pubsubio and the result is the same. No messages in the newly created topics are being consumed. I wonder if there is an extra option that I should be enabling? Any deployment-specific flag? I am a little bit lost now.

There are messages in the subscription (a few million). When performed an experiment and changed the subscription name to something that doesn't exist I have seen errors in dataflow logs. Otherwise no errors, no info apart from generic dataflow debug.

2022-07-08T11:21:31.793474125ZStarting 3 workers in europe-west4-a...
Debug
2022-07-08T11:21:31.820662575ZStarting worker pool setup.
Debug
2022-07-08T11:22:00.789962383ZAutoscaling: Raised the number of workers to 3 so that the pipeline can catch up with its backlog and keep up with its input rate.
Debug
2022-07-08T11:22:50.806937837ZWorkers have started successfully.

Here is a part of my pipeline code:

var (
    inputTopic        = flag.String("topic", "", "PubSub input topic (required).")
    inputSubscription = flag.String("inputSubscription", "", "PubSub input subscription (required).")
    outputTableSpec   = flag.String("outputTableSpec", "", "Output BQ table (required).")
)

func init() {
    beam.RegisterType(reflect.TypeOf((*event.Envelope)(nil)).Elem())
    beam.RegisterType(reflect.TypeOf((*decodeEnvelopeJSONFunc)(nil)).Elem())
    [...]
}

func main() {
    flag.Parse()
    beam.Init()

    ctx := context.Background()
    if err := validateFlags(); err != nil {
        log.Exit(ctx, err.Error())
    }

    project := gcpopts.GetProject(ctx)
    p, s := beam.NewPipelineWithRoot()

    pubSubMessages := pubsubio.Read(s, project, *inputTopic, &pubsubio.ReadOptions{
        Subscription: *inputSubscription, WithAttributes: false, IDAttribute: "", TimestampAttribute: "",
    })

    eventMapper := DecodeAndMap(s, pubSubMessages)

    bigqueryio.Write(s, project, *outputTableSpec, eventMapper)

    if err := beamx.Run(ctx, p); err != nil {
        log.Fatalf(ctx, "failed to execute job: %v", err)
    }
}

func DecodeAndMap(s beam.Scope, messages beam.PCollection) beam.PCollection {
    s = s.Scope("DecodeAndMap")
    events := beam.ParDo(s, &decodeEnvelopeJSONFunc{}, messages)
    return beam.ParDo(s, &mapPayloadFunc{}, events)
}

type decodeEnvelopeJSONFunc struct{}

func (f *decodeEnvelopeJSONFunc) ProcessElement(ctx context.Context, msg []byte, emit func(*event.Envelope)) error {
    var e event.Envelope
    log.Infoln(ctx, "decoding envelope")

    if err := json.NewDecoder(bytes.NewReader(msg)).Decode(&e); err != nil {
        return fmt.Errorf("failed to decode envelope: %w", err)
    }

    log.Infoln(ctx, "emitting envelope")
    emit(&e)

    return nil
}

[...]

Here is how I am deploying my pipeline

go run ./pkg/my-mapper/. \
    --runner dataflow \
    --job_name my-mapper \
    --project mb-gcp-project \
    --region europe-west4 --zone europe-west4-a \
    --temp_location gs://my-beam-tmp-data-bucket/tmp/ \
    --staging_location gs://my-beam-tmp-data-bucket/binaries/ \
    --worker_harness_container_image=apache/beam_go_sdk:latest \
    --subnetwork regions/europe-west4/subnetworks/my-subnetwork \
    --num_workers 3 \
    --max_num_workers 10 \
    --async --update \
    --topic my-topic-name --inputSubscription my-sub-name --outputTableSpec my-gcp-project:my_dataset.mapped_data
2022/07/08 12:16:33 Cross-compiling ... as /tmp/worker-1-1657278993706049280
[...]
  "type": "JOB_TYPE_STREAMING"
}
2022/07/08 12:20:11 Submitted job: 2022-07-08_04_20_11-11918574995509384496

Workers pool runnig

Upvotes: 4

Views: 689

Answers (1)

Robert Burke
Robert Burke

Reputation: 111

This looks like a windowing problem. It's a streaming pipeline that isn't windowing it's data before writing out, which means it's trying to aggregate in the Default Global Window, and never terminate or Ack the messages.

Add a beam.WindowInto step to some granularity after the PubSub step. Either with some windowing, or Triggers. See the Programming Guide on Windowing and Triggers for more info: https://beam.apache.org/documentation/programming-guide/#windowing

This will allow any aggretation operation in the bigqueryio.Write step to actually complete, regardless of if it's the "xlang" version or not.

At this time, I'd recommend using the xlang version, as that has been validated for Streaming use. The Go native one hasn't been tested and vetted to work properly for Streaming writes, and there could be issues as a result.

Eg. If you're using the native bigqueryIO, your pipeline is getting held up here, as the pipeline is waiting for the end of the global window. https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/bigqueryio/bigquery.go#L207

Otherwise, here are some things to look into though:

  1. Ensure your import paths are using the "sdks/v2" of the SDK. Imports that don't have that v2 are stuck at 2.32, and aren't expected to work. No other code changes are exected. So good (see that latest is 2.40), and bad (see that latest is 2.32+incompatible). Edit: Looking at the screenshot, you're already doing this. Excellent!
  2. Ensure that the fields of event.Envelope are all Exported. Otherwise they won't be serialized by any of either JSON or the beam native Schema encoding. I can't see this being the root problem however.
  3. If you're using a released version of the SDK (v2.40.0, v2.39.0 etc...d), you don't need to include the worker_harness_container_image flag, and infact that could cause worker startup issues. While historically the Beam Go container hasn't changed much, that's no longer the case as additional features are being added, which require coordination between the container bootloader and the worker harness code.

Cheers!

Upvotes: 1

Related Questions