KlaaTh
KlaaTh

Reputation: 13

SpannerIO java.lang.IllegalStateException: Sorter should be null here

I am trying to write to Spanner from a DataFlow streaming job by using

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>2.18.0</version>
</dependency>

After mapping the data to PCollection<Mutation> I am writing them to Spanner via SpannerIO.write

Pipeline pipeline = Pipeline.create(options);

PCollection<Mutation> mutations = pipeline.apply...

mutations.apply("WriteMutations", SpannerIO.write()
                .withInstanceId(INSTANCE_ID)
                .withDatabaseId(DATABASE_ID)
);

pipeline.run();

However, it throws

java.lang.IllegalStateException: Sorter should be null here
        at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$GatherBundleAndSortFn.startBundle (SpannerIO.java:1080)

What would be the cause of this exception?

The following pipeline produces the exception. I test it with 20 workers but it looks like it's independent of the data load.

import com.google.cloud.spanner.Mutation;

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

import java.util.UUID;

public final class TestPipeline {

    private static final Duration WINDOW_DURATION = Duration.standardSeconds(1);
    private static final String DATABASE_ID = "test";
    private static final String INSTANCE_ID = "test-spanner";
    private static final String TEST_TABLE = "test";

    public static void main(String[] args) {
        TestPipelineOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(TestPipelineOptions.class);

        Pipeline pipeline = Pipeline.create(options);

        pipeline
                .apply("Read pubsub", PubsubIO.readMessagesWithAttributes()
                        .fromSubscription(options.getInputSubscription()))
                .apply("Parse message", ParDo.of(new ProcessMessage()))
                .apply("Windowing", Window.<Mutation>into(new GlobalWindows())
                        .triggering(Repeatedly.forever(
                                AfterProcessingTime.pastFirstElementInPane()
                                        .plusDelayOf(WINDOW_DURATION)))
                        .withAllowedLateness(Duration.ZERO)
                        .discardingFiredPanes())
                .apply("Write mutations", SpannerIO.write()
                        .withInstanceId(INSTANCE_ID)
                        .withDatabaseId(DATABASE_ID)
                );

        pipeline.run();
    }

    private static class ProcessMessage extends DoFn<PubsubMessage, Mutation> {

        @ProcessElement
        public void processElement(@Element final PubsubMessage message,
                                   final OutputReceiver<Mutation> out) {
            out.output(Mutation.newInsertOrUpdateBuilder(TEST_TABLE)
                    .set("id").to(UUID.randomUUID().toString())
                    .set("string").to("test")
                    .set("count").to(Long.MAX_VALUE)
                    .build()
            );
        }
    }

    interface TestPipelineOptions extends DataflowPipelineOptions {

        void setInputSubscription(String inputSubscription);

        @Description("Google Pubsub subscription id.")
        String getInputSubscription();
    }

}

Table CREATE TABLE test (id STRING(50) NOT NULL, string STRING(50) NOT NULL, count INT64) PRIMARY KEY (id);

Upvotes: 1

Views: 338

Answers (1)

Rose Liu
Rose Liu

Reputation: 251

This issue seems to occur with apache beam version 2.18, but not with version 2.17.

The issue with apache beam version 2.18 is tracked here: https://issues.apache.org/jira/browse/BEAM-9505

Upvotes: 1

Related Questions