Tianyu Lang
Tianyu Lang

Reputation: 51

Side input in global window as slowly changing cache questions

Context: We have some schema files in Cloud Storage. In our Dataflow job, we need to refer to these schema files to transform our data. These schema files change on a daily/weekly basis. Our data source is PubSub and we window PubSub messages into a fixed window of 1 minutes. The schema files we need fit well into memory, they are about 90 MB.

What I have tried: Referring to this doc from Apache Beam, we created a side input that writes into a global window with a GenerateSequence like so:

    // Creates a side input that refreshes the schema every minute
PCollectionView<Map<String, byte[]>> dataBlobView =
    pipeline.apply(GenerateSequence.from(0).withRate(1, Duration.standardDays(1L)))
        .apply(Window.<Long>into(new GlobalWindows()).triggering(
            Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
            .discardingFiredPanes())
        .apply(ParDo.of(new DoFn<Long, Map<String, byte[]>>() {
          @ProcessElement
          public void processElement(ProcessContext ctx) throws Exception {
            byte[] avroSchemaBlob = getAvroSchema();
            byte[] fileDescriptorSetBlob = getFileDescriptorSet();
            byte[] depsBlob = getFileDescriptorDeps();
            Map<String, byte[]> dataBlobs = ImmutableMap.of(
                "version", Longs.toByteArray(ctx.element().byteValue()),
                "avroSchemaBlob", avroSchemaBlob,
                "fileDescriptorSetBlob", fileDescriptorSetBlob,
                "depsBlob", depsBlob);
            ctx.output(dataBlobs);
          }
        }))
        .apply(View.asSingleton());

"getAvroSchema", "getFileDescriptorSet" and "getFileDescriptorDeps" read files as byte[] from Cloud Storage.

However, this approach failed from the exception:

org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.

I then tried writing my own Combine Globally function like so:

static class GetLatestVersion implements SerializableFunction<Iterable<Map<String, byte[]>>, Map<String, byte[]>> {
@Override
public Map<String, byte[]> apply(Iterable<Map<String, byte[]>> versions) {
  Map<String, byte[]> result = Maps.newHashMap();
  Long maxVersion = Long.MIN_VALUE;
  for (Map<String, byte[]> version: versions){
    Long currentVersion = Longs.fromByteArray(version.get("version"));
    logger.info("Side input version: " + currentVersion);
    if (currentVersion > maxVersion) {
      result = version;
      maxVersion = currentVersion;
    }
  }
  return result;
}

}

But it still triggers the same exception........

I then came across this and this Beam email archives and it seems like what's suggested in the Beam doc does not work. And I have to use a MultiMap to avoid the exception I ran into above. With a MultiMap, I will also have to iterate through the values and have my own logic to pick my desired value (latest).

My questions:

  1. Why do I still get the exception "PCollection with more than one element accessed as a singleton view" even after I globally combine everything into 1 result?
  2. If I go with the MultiMap approach, wouldn't the job eventually run out of memory? Because everyday we are basically increasing the MultiMap by 90 MB (the size of our data blob), unless Dataflow has some smart MultiMap implementation behind the scene.
  3. What is the recommended way to do this?

Thanks

Upvotes: 2

Views: 1661

Answers (1)

Gabriel Hodoroaga
Gabriel Hodoroaga

Reputation: 353

Use .apply(View.asMap()) instead of .apply(View.asSingleton());

This is the full example:

PCollectionView<Map<String, byte[]>> dataBlobView =
    pipeline.apply(GenerateSequence.from(0).withRate(1, Duration.standardDays(1L)))
        .apply(Window.<Long>into(new GlobalWindows()).triggering(
            Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
            .discardingFiredPanes())
        .apply(ParDo.of(new DoFn<Long, KV<String, byte[]>>() {
          @ProcessElement
          public void processElement(ProcessContext ctx) throws Exception {
            byte[] avroSchemaBlob = getAvroSchema();
            byte[] fileDescriptorSetBlob = getFileDescriptorSet();
            byte[] depsBlob = getFileDescriptorDeps();

            ctx.output(KV.of("version", Longs.toByteArray(ctx.element().byteValue())));
            ctx.output(KV.of("avroSchemaBlob", avroSchemaBlob));
            ctx.output(KV.of("fileDescriptorSetBlob", fileDescriptorSetBlob));
            ctx.output(KV.of("depsBlob", depsBlob));
          }
        }))
        .apply(View.asMap());

You can use the map from the side inputs as described in documentation.

Apache Beam version 2.34.0

Upvotes: 0

Related Questions