Kassem Shehady
Kassem Shehady

Reputation: 760

Using google datastore to check items in streaming mode

I want to insert items into Google Datastore if they don't exist in it. I write a dataflow streaming job.

Job

static class RawToObjectConverter extends DoFn<String, Entity> {
    @Override
    public void processElement(ProcessContext c) {      

        Query<Entity> query = Query.entityQueryBuilder().kind("Post").filter(PropertyFilter.eq("postid", rq.postid))
                .build();
        QueryResults<Entity> posts = datastore.run(query);

        if (posts == null || !posts.hasNext()) {
            Entity post = Entity.builder(datastore.newKeyFactory().newKey("Post"))                 
                     .set("postid", "1")
                    .set("title", "p1")
                    .build();
            c.output(post);
        }           
    }
}

Problem

lines.apply(ParDo.of(new RawToObjectConverter()))
    .apply(DatastoreIO.v1().write().withProjectId(projectid));

The method apply(PTransform<? super PCollection<Entity>,OutputT>) in the type PCollection<Entity> is not applicable for the arguments (DatastoreV1.Write)

Also should I use com.google.cloud.datastore.Datastore or com.google.datastore.v1.Entity?

Upvotes: 0

Views: 226

Answers (1)

Vikas Kedigehalli
Vikas Kedigehalli

Reputation: 306

You will need to convert your com.google.cloud.datastore.Entity to com.google.datastore.v1.Entity in RawToObjectConverter before using DatastoreIO.v1().write(), using toPb method

c.output(post.toPb());

Upvotes: 4

Related Questions