Reputation: 760
I want to insert items into Google Datastore if they don't exist in it. I write a dataflow streaming job.
Job
static class RawToObjectConverter extends DoFn<String, Entity> {
@Override
public void processElement(ProcessContext c) {
Query<Entity> query = Query.entityQueryBuilder().kind("Post").filter(PropertyFilter.eq("postid", rq.postid))
.build();
QueryResults<Entity> posts = datastore.run(query);
if (posts == null || !posts.hasNext()) {
Entity post = Entity.builder(datastore.newKeyFactory().newKey("Post"))
.set("postid", "1")
.set("title", "p1")
.build();
c.output(post);
}
}
}
Problem
lines.apply(ParDo.of(new RawToObjectConverter()))
.apply(DatastoreIO.v1().write().withProjectId(projectid));
The method apply(PTransform<? super PCollection<Entity>,OutputT>)
in the type PCollection<Entity>
is not applicable for the arguments (DatastoreV1.Write
)
Also should I use com.google.cloud.datastore.Datastore
or com.google.datastore.v1.Entity
?
Upvotes: 0
Views: 226
Reputation: 306
You will need to convert your com.google.cloud.datastore.Entity to com.google.datastore.v1.Entity in RawToObjectConverter before using DatastoreIO.v1().write(), using toPb method
c.output(post.toPb());
Upvotes: 4