Reputation: 33
I am trying to write a dataflow pipeline to migrate data from google Datastore to BigQuery using Python. After some search I figured I need to do three steps:
1. ReadFromDatastore
2. Convert to Python dicts or Tablerows
3. WriteToBigQuery
Now, first and last step is simple as they are the functions themselves. But I am having hard time in finding a good way to do the second step.
I wrote the output of ReadFromDatastore to a text file and json looks like as below:
key {
partition_id {
project_id: "ProjectID"
}
path {
kind: "KindName"
id:9999
}
}
properties {
key: "property1"
value {
string_value: "property_value"
}
}
properties {
key: "property2"
value {
string_value: ""
}
}
properties {
key: "property3"
value {
boolean_value: false
}
}
properties {
key: "created"
value {
timestamp_value {
seconds: 4444
nanos: 2222
}
}
}
properties {
key: "created_by"
value {
string_value: "property_value"
}
}
properties {
key: "date_created"
value {
timestamp_value {
seconds: 4444
}
}
}
properties {
key: "property4"
value {
string_value: "property_value"
}
}
properties {
key: "property5"
value {
array_value {
values {
meaning: 00
string_value: "link"
exclude_from_indexes: true
}
}
}
}
properties {
key: "property6"
value {
null_value: NULL_VALUE
}
}
properties {
key: "property7"
value {
string_value: "property_value"
}
}
properties {
key: "property8"
value {
string_value: ""
}
}
properties {
key: "property9"
value {
timestamp_value {
seconds: 3333
nanos: 3333
}
}
}
properties {
key: "property10"
value {
meaning: 00
string_value: ""
exclude_from_indexes: true
}
}
properties {
key: "property11"
value {
boolean_value: false
}
}
properties {
key: "property12"
value {
array_value {
values {
key_value {
partition_id {
project_id: "project_id"
}
path {
kind: "Another_kind_name"
id: 4444
}
}
}
}
}
}
properties {
key: "property13"
value {
string_value: "property_value"
}
}
properties {
key: "version"
value {
integer_value: 4444
}
}
key {
partition_id {
project_id: "ProjectID"
}
path {
kind: "KindName"
id: 9999
}
}
.
.
.
.next_entity/row
Do I have to write a custom function to convert json to python dicts to be able to write to BigQuery or are there any functions/libraries from google datastore or apache that I can use?
I found an article describing what I am trying to do but code shown is in Java.
Upvotes: 0
Views: 635
Reputation: 11041
The output of the ReadFromDatastore
transform is of Entity
-typed protocol buffers.
To convert protobuff to JSON, you can check this question: Protobuf to json in python
You would do:
p | ReadFromDatastore(...) | beam.Map(my_proto_to_json_fn) | beam.WriteToBigQuery(...)
Upvotes: 1