Reputation: 650
I'm trying to build a Dataflow pipeline, that triggers on a JSON file upload to Google Cloud Storage and writes it to Cloud Datastore.
According to the Dataflow template json file must have each line in Datastore data object format, defined here.
This is how my json file looks like which I'm trying to adapt to Datastore data object:
{
"userId": "u-skjbdw34jh3gx",
"rowRanks:": [
{
"originalTrigger": "recent",
"programmedRowPos": "VR1",
"reoderedRowPos": 0
},
{
"originalTrigger": "discovery",
"programmedRowPos": "VR1",
"reoderedRowPos": 1
}
]
}
Following is how far I've reached trying to adapt it to the above linked data object.
{
"key": {
"partitionId": {
"projectId": "gcp-project-id",
"namespaceId": "spring-demo"
},
"path":
{
"kind": "demo",
"name": "userId"
}
},
"properties": {
"userId": {
"stringValue": "01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"
}
}
}
Following is the error I'm getting in Dataflow when it's trying to write to Datastore:
com.google.protobuf.InvalidProtocolBufferException: java.io.EOFException: End of input at line 1 column 2 path $.
at com.google.protobuf.util.JsonFormat$ParserImpl.merge(JsonFormat.java:1195)
at com.google.protobuf.util.JsonFormat$Parser.merge(JsonFormat.java:370)
at com.google.cloud.teleport.templates.common.DatastoreConverters$EntityJsonParser.merge(DatastoreConverters.java:497)
at com.google.cloud.teleport.templates.common.DatastoreConverters$JsonToEntity.processElement(DatastoreConverters.java:351)
Upvotes: 4
Views: 757
Reputation: 7058
If I understood correctly your input data format and desired output this js code should do the trick:
var data = {
"userId": "u-skjbdw34jh3gx",
"rowRanks": [
{
"originalTrigger": "recent",
"programmedRowPos": "VR1",
"reorderedRowPos": 0
},
{
"originalTrigger": "discovery",
"programmedRowPos": "VR1",
"reorderedRowPos": 1
}
]
}
var entity = {};
entity.key = {};
entity.key.partitionId = {};
entity.key.partitionId.projectId = "gcp-project-id";
entity.key.partitionId.namespaceId = "spring-demo";
var path = {}
path.kind = "demo";
path.name = "userId";
entity.key.path = [];
entity.key.path.push(path);
entity.properties = {};
entity.properties.userId = {};
entity.properties.userId.stringValue = data.userId;
entity.properties.rowRanks = {};
entity.properties.rowRanks.arrayValue = {};
var arrayValues = [];
data.rowRanks.forEach(buildArrayValue);
function buildArrayValue(row) {
var temp = {};
temp.entityValue = {};
temp.entityValue.properties = {};
temp.entityValue.properties.originalTrigger = {};
temp.entityValue.properties.originalTrigger.stringValue = row.originalTrigger;
temp.entityValue.properties.programmedRowPos = {};
temp.entityValue.properties.programmedRowPos.stringValue = row.programmedRowPos;
temp.entityValue.properties.reorderedRowPos = {};
temp.entityValue.properties.reorderedRowPos.integerValue = row.reorderedRowPos;
arrayValues.push(temp);
}
entity.properties.rowRanks.arrayValue.values = arrayValues;
document.write(JSON.stringify(entity));
Basically building the rowRanks
array thanks to the forEach()
loop. Note that path
needs to be an array though (reference).
Now we modify it slightly to run in the template code instead of a browser, upload the files to GCS and follow the instructions here to execute it:
gcloud dataflow jobs run test-datastore \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Datastore \
--parameters=javascriptTextTransformGcsPath=gs://$BUCKET/*.js,errorWritePath=gs://$BUCKET/errors.txt,javascriptTextTransformFunctionName=transform,textReadPattern=gs://$BUCKET/*.json,datastoreWriteProjectId=$PROJECT
the full content of the js file uploaded to GCS is:
function transform(elem) {
var data = JSON.parse(elem);
var entity = {};
entity.key = {};
entity.key.partitionId = {};
entity.key.partitionId.projectId = "gcp-project-id";
entity.key.partitionId.namespaceId = "spring-demo";
var path = {}
path.kind = "demo";
path.name = "userId";
entity.key.path = [];
entity.key.path.push(path);
entity.properties = {};
entity.properties.userId = {};
entity.properties.userId.stringValue = data.userId;
entity.properties.rowRanks = {};
entity.properties.rowRanks.arrayValue = {};
var arrayValues = [];
data.rowRanks.forEach(buildArrayValue);
function buildArrayValue(row) {
var temp = {};
temp.entityValue = {};
temp.entityValue.properties = {};
temp.entityValue.properties.originalTrigger = {};
temp.entityValue.properties.originalTrigger.stringValue = row.originalTrigger;
temp.entityValue.properties.programmedRowPos = {};
temp.entityValue.properties.programmedRowPos.stringValue = row.programmedRowPos;
temp.entityValue.properties.reorderedRowPos = {};
temp.entityValue.properties.reorderedRowPos.integerValue = row.reorderedRowPos;
arrayValues.push(temp);
}
entity.properties.rowRanks.arrayValue.values = arrayValues;
return JSON.stringify(entity);
}
The job runs successfully for me:
and the data is written to Datastore:
Let me know if that helps you.
Upvotes: 1
Reputation: 650
The json file should have the google cloud datastore object in a single line. Hence, the error quoted in question: End of input at line 1 column 2 path $.
It should be as follows:
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
Obviously, the json file will consists of thousands of objects, but each of them must be in a single line:
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
Upvotes: 4