Reputation: 3262
TLDR: Looking for a way to update Datastore entities without overwriting existing data via Dataflow
I'm using dataflow 2.0.0 (beam) to update entities in Google Datastore. My dataflow loads entities from datastore, updates them, and then saves them back into datastore (overwriting existing entities).
However, during the update process I also discover additional entities that may or may not already exist. In order to prevent overwriting existing entities, I previously would load all the entities from Datastore and reduce them (group by key), removing new duplicates.
As the number of entities grows, I want to avoid having to load all entities into Dataflow (instead taking them in batches based on oldest timestamps), but I'm coming across the problem that old entities are getting overwritten when they are not in the current batch.
I'm writing the entities to Dataflow using (in two spots, one for existing entities, and one for new entities):
collection.apply(DatastoreIO.v1().write().withProjectId("..."))
It would be really nice if there was something like a DatastoreIO.v1().writeNew()
method but sadly it doesn't exist. Thank you for any help.
Upvotes: 3
Views: 1540
Reputation: 316
If you want to write a new entity that does not exist on Datastore, you just create one with a new key and write it.
List<String> keyNames = Arrays.asList("L1", "L2"); // Somewhat you have new keys to store
PTransform<PCollection<Entity>, ?> write =
DatastoreIO.v1().write().withProjectId(project_id); // This is a typical write operation
p.
apply("GetInMemory", Create.of(keyNames)).setCoder(StringUtf8Coder.of()). // L1 and L2 are loaded
apply("Proc1", ParDo.of(new DoFn<String, Entity>(){
@ProcessElement
public void processElement(ProcessContext c) {
Key.Builder key = makeKey("k2", c.element()); // Generate an entity key
final Entity entity = Entity.newBuilder().
setKey(key). // Set the key
putProperties("p1", makeValue(new String("test constant value")
).setExcludeFromIndexes(true).build()).
build();
c.output(entity);
}
})).
apply(write); // Write them
p.run();
Entire code can be referred in my code repository at https://github.com/yiu31802/gcp-project/commit/cc224b34
Upvotes: 1