Christophe Bouhier
Christophe Bouhier

Reputation: 224

PubsubIO , msg exceeding max size, how to perform error handling

We are running a pipeline in GCP Dataflow, and run into the max message size of a pubsub message [1] When this happens, the pipeline lag time will start to build up, eventually grinding to a halt...

This log message was produced in GCP stackdriver under 'dataflow_step',

My question, is there a way to define error handling in the pipeline...

.apply(PubsubIO.writeMessages()
                        .to("topic")
                        .withTimestampAttribute(Instant.now().toString()));

with something like

.onError(...perform error handling ...)

In a similar fluent manner as the Java8 streams api. which would allow the pipeline to continue with outputs which are within the pubsub limits.

Other solutions to deal with this situation are most welcome.

Thank You , Christophe Bouhier

[1] Could not commit request due to validation error: generic::invalid_argument: Pubsub publish requests are limited to 10MB, rejecting message over 7MB to avoid exceeding limit with byte64 request encoding.

Upvotes: 1

Views: 1253

Answers (1)

Jeff Klukas
Jeff Klukas

Reputation: 1357

For the particular case of PubsubIO on Dataflow, be aware that the Dataflow overrides PubsubIO and handles reading and writing messages to Pubsub as part of its streaming implementation. I've seen the same error you're discussing show up in logs under "shuffler" rather than "worker" due to this substitution.

I have worked around this same problem by implementing a custom transform before the PubsubIO.write() step. This LimitPayloadSize transform simply checks how many bytes are in the PubsubMessage and only allows through messages with payload less than 7 MB.

There is not currently a fluent API for error handling in transforms, although that's something that has been discussed. For now, the accepted pattern is to define a transform with multiple output collections and then write the collection of failing messages somewhere else (such as GCS via FileIO). You can implement this as a bare DoFn, or you could look at Partition:

PCollectionList<PubsubMessage> limitedPayloads = input
        .apply("Limit payload size",
                Partition
                        .of(2, new PartitionFn<PubsubMessage>() {
  public int partitionFor(PubsubMessage message, int numPartitions) {
    return message.getPayload().size < 7 * 1000 * 1000 ? 0 : 1;
  }
}));
limitedPayloads.get(0).apply(PubsubIO.write()...);
limitedPayloads.get(1).apply(FileIO.write()...);

Upvotes: 3

Related Questions