Reputation: 169
Using Google Dataflow i need to read data from Google spanner and write into PubSub as Batch Process, I have more than 100000 records in Spanner , so i need to read those records and publish into PubSub Topic using pubsub batch, 1000 records will be the limit for each iteration of publish.
Please help me on this
Upvotes: 1
Views: 839
Reputation: 276
I would like to understand the exact use case and what you are trying to accomplish with this.
You could use the below for reading from Spanner and batching at pub sub publishing level. It batches pubsub messages while publishing. (Here 1 row gets published as one pubsub message)
Pipeline Process
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(pipelineArgs).as(CustomPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
SpannerConfig spannerConfig = SpannerConfig.create()
.withDatabaseId(options.getSpannerDatabaseId())
.withProjectId(options.getSpannerProjectId())
.withInstanceId(options.getSpannerInstanceId());
pipeline.apply(SpannerIO.read()
.withTable("TestTable")
.withSpannerConfig(spannerConfig)
.withColumns(Arrays.asList("TestColumn")))
.apply( ParDo.of(new StructToPubSubConverter()))
.apply(PubsubIO.writeMessages()
.to(options.getPubsubWriteTopic())
.withMaxBatchSize(1000)); // Batch Size
pipeline.run();
Spanner to PubSub Converter
public static class StructToPubSubConverter extends DoFn<Struct, PubsubMessage> {
@ProcessElement
public void processElement(ProcessContext context, OutputReceiver<PubsubMessage> out){
Struct struct =context.element();
String testColumn = struct.getString(0);
context.output(new PubsubMessage(testColumn.getBytes(),new HashMap<>()));
}
}
Not sure, if this addresses your problem but should provide a fair idea. Sharing more details will be helpful.
Upvotes: 0
Reputation: 1917
One way to do this is using Using the Dataflow connector
Reading data from Cloud Spanner
To read from Cloud Spanner, apply the SpannerIO.read() transform. Configure the read using the methods in the SpannerIO.Read class. Applying the transform returns a PCollection, where each element in the collection represents an individual row returned by the read operation. You can read from Cloud Spanner with and without a specific SQL query, depending on your desired output.
Applying the SpannerIO.read() transform returns a consistent view of data by performing a strong read. Unless you specify otherwise, the result of the read is snapshotted at the time that you started the read. See reads for more information about the different types of reads Cloud Spanner can perform.
see: https://cloud.google.com/spanner/docs/dataflow-connector
This thread seems to explain how to write from DataFlow to PubSub: Publish messages to Pubsub topic in Dataflow
Upvotes: 1