Reputation: 13
I am trying to write to Spanner from a DataFlow streaming job by using
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>2.18.0</version>
</dependency>
After mapping the data to PCollection<Mutation>
I am writing them to Spanner via SpannerIO.write
Pipeline pipeline = Pipeline.create(options);
PCollection<Mutation> mutations = pipeline.apply...
mutations.apply("WriteMutations", SpannerIO.write()
.withInstanceId(INSTANCE_ID)
.withDatabaseId(DATABASE_ID)
);
pipeline.run();
However, it throws
java.lang.IllegalStateException: Sorter should be null here
at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$GatherBundleAndSortFn.startBundle (SpannerIO.java:1080)
What would be the cause of this exception?
The following pipeline produces the exception. I test it with 20 workers but it looks like it's independent of the data load.
import com.google.cloud.spanner.Mutation;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
import java.util.UUID;
public final class TestPipeline {
private static final Duration WINDOW_DURATION = Duration.standardSeconds(1);
private static final String DATABASE_ID = "test";
private static final String INSTANCE_ID = "test-spanner";
private static final String TEST_TABLE = "test";
public static void main(String[] args) {
TestPipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(TestPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read pubsub", PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()))
.apply("Parse message", ParDo.of(new ProcessMessage()))
.apply("Windowing", Window.<Mutation>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(WINDOW_DURATION)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply("Write mutations", SpannerIO.write()
.withInstanceId(INSTANCE_ID)
.withDatabaseId(DATABASE_ID)
);
pipeline.run();
}
private static class ProcessMessage extends DoFn<PubsubMessage, Mutation> {
@ProcessElement
public void processElement(@Element final PubsubMessage message,
final OutputReceiver<Mutation> out) {
out.output(Mutation.newInsertOrUpdateBuilder(TEST_TABLE)
.set("id").to(UUID.randomUUID().toString())
.set("string").to("test")
.set("count").to(Long.MAX_VALUE)
.build()
);
}
}
interface TestPipelineOptions extends DataflowPipelineOptions {
void setInputSubscription(String inputSubscription);
@Description("Google Pubsub subscription id.")
String getInputSubscription();
}
}
Table CREATE TABLE test (id STRING(50) NOT NULL, string STRING(50) NOT NULL, count INT64) PRIMARY KEY (id);
Upvotes: 1
Views: 338
Reputation: 251
This issue seems to occur with apache beam version 2.18, but not with version 2.17.
The issue with apache beam version 2.18 is tracked here: https://issues.apache.org/jira/browse/BEAM-9505
Upvotes: 1