Shinya Yaginuma
Shinya Yaginuma

Reputation: 17

Is it able to stream data from pubsub to datastore using dataflow?

I try to stream data from pubsub to datastore using dataflow.

reference: https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/src/main/java/com/google/cloud/teleport/templates

I try to build template, but it does not work at all. So, I think that It is not possible.

How is it? Please give me some advice.

Upvotes: 0

Views: 1899

Answers (1)

yoape
yoape

Reputation: 3315

You may have stumbled on a bug in that specific template. There are two separate issues in it, first is the one answered in this SO question How to use google provided template [pubsub to Datastore]? which points to the missing errorTag and the second is that the writer to Datastore actually uses a GroupByKey when it writes the entities to the Datastore.

If you run the maven compile command with the -e option it will show you the error message GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey. Why does it do this? It has to do with the fact that messages are streamed from PubSub and not batched (which is what we would expect). That means that there is no finite set of items that are streaming in but a never ending stream of item. In order to work with that we need to limit it to a streaming window of items that can be considered by aggregation functions such as GroupByKey. The DatastoreConverters class that helps to write entities to the Datastore actually checks if we are trying to write the same key multiple times, and it does that by using the GroupByKeyfunction.

Simple solution, just give it a streaming window to work with, here is an added third .apply(...) in the pipeline that windows the stream together and allows you to use the datastore writer here:

import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;

... 

  public static void main(String[] args) {
    PubsubToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(PubsubToDatastoreOptions.class);

    Pipeline pipeline = Pipeline.create(options);
    TupleTag<String> errorTag = new TupleTag<String>("errors") {};

    pipeline
        .apply(PubsubIO.readStrings()
            .fromTopic(options.getPubsubReadTopic()))
        .apply(TransformTextViaJavascript.newBuilder()
            .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
            .setFunctionName(options.getJavascriptTextTransformFunctionName())
            .build())
        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))))
        .apply(WriteJsonEntities.newBuilder()
            .setProjectId(options.getDatastoreWriteProjectId())
            .setErrorTag(errorTag)
            .build());

    pipeline.run();
  }

Now there are other, and very possibly much better, ways of doing this but this will get your template compiled and working. This examples shows a FixedWindow of 1 second, there are other options for doing this, check the documentation for that Google DataFlow - Windowing.

Compile your template with:

mvn compile exec:java -Dexec.mainClass=com.google.cloud.teleport.templates.PubsubToDatastore -Dexec.cleanupDaemonThreads=false -Dexec.args=" \
--project=[YOUR_PROJECTID_HERE] \
--stagingLocation=gs://[YOUR_BUCKET_HERE]/staging \
--tempLocation=gs://[YOUR_BUCKET_HERE]/temp \
--templateLocation=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--runner=DataflowRunner"

And then startup the job with:

gcloud dataflow jobs run [NAME_OF_THE_JOB_WHATEVER_YOU_LIKE] \
--gcs-location=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--zone=[ZONE_WHERE_YOU_WANT_TO_RUN] \
--parameters "pubsubReadTopic=[YOUR_PUBSUB_TOPIC_HERE],datastoreWriteProjectId=[YOUR_PROJECTID_HERE]"

Now you should see your job running in GCP console if you look there:

DataFlow job running in GCP console

Note that this specific solution and chosen window will mean a delay of up to a second for the PubSub messages to end up in the Datastore. Shortening the window may help that a little, but in order to get a higher throughput than that you would need a different pipeline than the one shown here.

Upvotes: 3

Related Questions