Mingwei Samuel
Mingwei Samuel

Reputation: 3262

Google Dataflow: Write to Datastore without overwriting existing entities

TLDR: Looking for a way to update Datastore entities without overwriting existing data via Dataflow

I'm using dataflow 2.0.0 (beam) to update entities in Google Datastore. My dataflow loads entities from datastore, updates them, and then saves them back into datastore (overwriting existing entities).

However, during the update process I also discover additional entities that may or may not already exist. In order to prevent overwriting existing entities, I previously would load all the entities from Datastore and reduce them (group by key), removing new duplicates.

As the number of entities grows, I want to avoid having to load all entities into Dataflow (instead taking them in batches based on oldest timestamps), but I'm coming across the problem that old entities are getting overwritten when they are not in the current batch.


I'm writing the entities to Dataflow using (in two spots, one for existing entities, and one for new entities):

collection.apply(DatastoreIO.v1().write().withProjectId("..."))

It would be really nice if there was something like a DatastoreIO.v1().writeNew() method but sadly it doesn't exist. Thank you for any help.

Upvotes: 3

Views: 1540

Answers (1)

Junji Shimagaki
Junji Shimagaki

Reputation: 316

If you want to write a new entity that does not exist on Datastore, you just create one with a new key and write it.

List<String> keyNames = Arrays.asList("L1", "L2"); // Somewhat you have new keys to store
PTransform<PCollection<Entity>, ?> write =
        DatastoreIO.v1().write().withProjectId(project_id); // This is a typical write operation

p.
    apply("GetInMemory", Create.of(keyNames)).setCoder(StringUtf8Coder.of()). // L1 and L2 are loaded
    apply("Proc1", ParDo.of(new DoFn<String, Entity>(){
        @ProcessElement
        public void processElement(ProcessContext c) {
            Key.Builder key = makeKey("k2", c.element());  // Generate an entity key
            final Entity entity = Entity.newBuilder().
                    setKey(key). // Set the key
                    putProperties("p1", makeValue(new String("test constant value")
                        ).setExcludeFromIndexes(true).build()).
                    build();
            c.output(entity);
        }
    })).
    apply(write); // Write them
p.run();

Entire code can be referred in my code repository at https://github.com/yiu31802/gcp-project/commit/cc224b34

Upvotes: 1

Related Questions