Reputation: 439
None of the provided DataFlow templates match what I need to do, so I'm trying to write my own. I managed to run the example code like word count example without issue, so I tried to butcher together parts separate examples that read from BigQuery and writes to Spanner but there's just so many things in the source code I don't understand and cannot adapt to my own problem.
I'm REALLY lost on this and any help is greatly appreciated!
The goal is to use DataFlow and Apache Beam SDK to read from a BigQuery table with 3 string fields and 1 integer field, then concatenate the content of the 3 string fields into one string and put that new string in a new field called "key", then I want to write the key field and the integer field (which is unchanged) to a Spanner table that already exists, ideally append rows with a new key and update the integer field of rows with a key that already exists.
I'm trying to do this in Java because there is no i/o connector for Python. Any advice on doing this with Python are much appreciated.
For now I would be super happy if I could just read a table from BigQuery and write whatever I get from that table to a table in Spanner, but I can't even make that happen.
Problems:
Honestly, I'm too embarrassed to even show that code I'm trying to run.
public class SimpleTransfer {
public static void main(String[] args) {
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For Cloud execution, set the Cloud Platform project, staging location, and specify DataflowRunner.
options.setProject("myproject");
options.setStagingLocation("gs://mybucket");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
String tableSpec = "database.mytable";
// read whole table from bigquery
rowsFromBigQuery =
p.apply(
BigQueryIO.readTableRows()
.from(tableSpec);
// Hopefully some day add a transform
// Somehow make a Mutation
PCollection<Mutation> mutation = rowsFromBigQuery;
// Only way I found to write to Spanner, not even sure if that works.
SpannerWriteResult result = mutation.apply(
SpannerIO.write().withInstanceId("myinstance").withDatabaseId("mydatabase").grouped());
p.run().waitUntilFinish();
}
}
Upvotes: 1
Views: 1302
Reputation: 11031
It's intimidating to deal with these strange data types, but once you get used to the TableRow
and Mutation
types, you'll be able to code robust pipelines.
The first thing you need to do is take your PCollection
of TableRow
s, and convert those into an intermediate format that is convenient for you. Let's use Beam's KV
, which defines a key-value pair. In the following snippet, we're extracting the values from the TableRow
, and concatenating the string you want:
rowsFromBigQuery
.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings()
TypeDescriptors.integers()))
.via(tableRow -> KV.of(
(String) tableRow.get("myKey1")
+ (String) tableRow.get("myKey2")
+ (String) tableRow.get("myKey3"),
(Integer) tableRow.get("myIntegerField"))))
Finally, to write to Spanner, we use Mutation
-type objects, which define the kind of mutation that we want to apply to a row in Spanner. We'll do it with another MapElements
transform, which takes N inputs, and returns N outputs. We define the insert or update mutations there:
myKvPairsPCollection
.apply(
MapElements.into(TypeDescriptor.of(Mutation.class))
.via(elm -> Mutation.newInsertOrUpdateBuilder("myTableName)
.set("key").to(elm.getKey())
.set("value").to(elm.getValue()));
And then you can pass the output to that to SpannerIO.write
. The whole pipeline looks something like this:
Pipeline p = Pipeline.create(options);
String tableSpec = "database.mytable";
// read whole table from bigquery
PCollection<TableRow> rowsFromBigQuery =
p.apply(
BigQueryIO.readTableRows().from(tableSpec));
// Take in a TableRow, and convert it into a key-value pair
PCollection<Mutation> mutations = rowsFromBigQuery
// First we make the TableRows into the appropriate key-value
// pair of string key and integer.
.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings()
TypeDescriptors.integers()))
.via(tableRow -> KV.of(
(String) tableRow.get("myKey1")
+ (String) tableRow.get("myKey2")
+ (String) tableRow.get("myKey3"),
(Integer) tableRow.get("myIntegerField"))))
// Now we construct the mutations
.apply(
MapElements.into(TypeDescriptor.of(Mutation.class))
.via(elm -> Mutation.newInsertOrUpdateBuilder("myTableName)
.set("key").to(elm.getKey())
.set("value").to(elm.getValue()));
// Now we pass the mutations to spanner
SpannerWriteResult result = mutations.apply(
SpannerIO.write()
.withInstanceId("myinstance")
.withDatabaseId("mydatabase").grouped());
p.run().waitUntilFinish();
}
Upvotes: 2