André Salvati
André Salvati

Reputation: 302

Issue with Google Dataflow using DatastoreIO

I'm trying to read a Datastore table with 300k records from Dataflow (DatastoreIO) and getting the following error from Datastore API.

400 Bad Request a composite filter must have at least one sub-filter

Code:

Query.Builder q = Query.newBuilder();
q.addKindBuilder().setName(options.getKind());
Query query = q.build();

(...)

Pipeline p = Pipeline.create(options);
p.apply(DatastoreIO.readFrom(options.getDataset(), query).named("ReadFromDatastore"))...

Error (happens 4 times before exit):

[INFO] ------------------------------------------------------------------------
[INFO] Building Google Cloud Dataflow Java Examples - All manual_build
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] >>> exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all >>>
[INFO] 
[INFO] <<< exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all <<<
[INFO] 
[INFO] --- exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all ---
Mar 14, 2015 8:58:48 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 41 files. Enable logging at DEBUG level to see which files will be staged.
Mar 14, 2015 8:58:50 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
INFO: Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
Mar 14, 2015 8:58:50 AM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading 41 files from PipelineOptions.filesToStage to GCS to prepare for execution in the cloud.
Mar 14, 2015 8:59:12 AM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading PipelineOptions.filesToStage complete: 1 files newly uploaded, 40 files cached
Mar 14, 2015 8:59:13 AM com.google.cloud.dataflow.sdk.io.DatastoreIO$Source queryLatestStatisticsTimestamp
INFO: Query for latest stats timestamp of dataset primebus01 took 854ms
Mar 14, 2015 8:59:13 AM com.google.cloud.dataflow.sdk.io.DatastoreIO$Source getEstimatedSizeBytes
INFO: Query for per-kind statistics took 233ms
Dataflow SDK version: manual_build
Mar 14, 2015 8:59:16 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/primebus01/dataflow/job/2015-03-14_04_59_16-991787963429613914
Submitted job: 2015-03-14_04_59_16-991787963429613914
2015-03-14T11:59:17.796Z: (ab0c86e705e7447a): Expanding GroupByKey operations into optimizable parts.
2015-03-14T11:59:17.800Z: (ab0c86e705e749b0): Annotating graph with Autotuner information.
2015-03-14T11:59:17.806Z: (ab0c86e705e74ee6): Fusing adjacent ParDo, Read, Write, and Flatten operations
2015-03-14T11:59:17.812Z: (ab0c86e705e7441c): Fusing consumer GetContent into ReadFromDatastore
2015-03-14T11:59:17.815Z: (ab0c86e705e74952): Fusing consumer CountWords/Count.PerElement/Init into CountWords/ExtractWords
2015-03-14T11:59:17.818Z: (ab0c86e705e74e88): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues into CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Read
2015-03-14T11:59:17.820Z: (ab0c86e705e743be): Fusing consumer WriteLines into CountWords/FormatCounts
2015-03-14T11:59:17.822Z: (ab0c86e705e748f4): Fusing consumer CountWords/FormatCounts into CountWords/Count.PerElement/Sum.PerKey/GroupedValues/Extract
2015-03-14T11:59:17.824Z: (ab0c86e705e74e2a): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Write into CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput
2015-03-14T11:59:17.826Z: (ab0c86e705e74360): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues/Extract into CountWords/Count.PerElement/Sum.PerKey/GroupedValues
2015-03-14T11:59:17.828Z: (ab0c86e705e74896): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput into CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial
2015-03-14T11:59:17.830Z: (ab0c86e705e74dcc): Fusing consumer CountWords/ExtractWords into GetContent
2015-03-14T11:59:17.832Z: (ab0c86e705e74302): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial into CountWords/Count.PerElement/Init
2015-03-14T11:59:17.843Z: (ab0c86e705e74d10): Adding StepResource setup and teardown to workflow graph.
2015-03-14T11:59:17.850Z: (ab0c86e705e7477c): Not adding lease related steps.
2015-03-14T11:59:17.864Z: (ac5ca5b613993974): Starting the input generators.
2015-03-14T11:59:17.882Z: (9a0f95eb7a7962f5): Adding workflow start and stop steps.
2015-03-14T11:59:17.884Z: (9a0f95eb7a796623): Assigning stage ids.
2015-03-14T11:59:18.290Z: (eb8131b6a76f5248): Starting worker pool setup.
2015-03-14T11:59:18.295Z: (eb8131b6a76f53ac): Starting 3 workers...
2015-03-14T11:59:18.318Z: S01: (1174d086003eadad): Executing operation CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Create
2015-03-14T11:59:18.345Z: (d91fb5c6a16bad02): Value "CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Session" materialized.
2015-03-14T11:59:18.367Z: S02: (1174d086003ea94c): Executing operation ReadFromDatastore+GetContent+CountWords/ExtractWords+CountWords/Count.PerElement/Init+CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial+CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput+CountWords/Count.PerElement/Sum....
2015-03-14T12:00:19.839Z: (9db26953adb2a181): java.io.IOException: java.io.IOException: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:266)
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.hasNext(BasicSerializableSourceFormat.java:239)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:173)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:120)
    at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:129)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:94)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$1.call(DataflowWorkerHarness.java:118)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$1.call(DataflowWorkerHarness.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:599)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.start(DatastoreIO.java:590)
    at com.google.cloud.dataflow.sdk.io.Source$WindowedReaderWrapper.start(Source.java:178)
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:257)
    ... 14 more
Caused by: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
    at com.google.api.services.datastore.client.RemoteRpc.makeException(RemoteRpc.java:115)
    at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:81)
    at com.google.api.services.datastore.client.BaseDatastoreFactory$RemoteRpc.call(BaseDatastoreFactory.java:41)
    at com.google.api.services.datastore.client.Datastore.runQuery(Datastore.java:109)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.getIteratorAndMoveCursor(DatastoreIO.java:630)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:597)
    ... 17 more
Caused by: com.google.api.client.http.HttpResponseException: 400 Bad Request
a composite filter must have at least one sub-filter
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1054)
    at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78)
    ... 21 more

2015-03-14T12:00:24.850Z: (7ffa05cd66cd6940): Failed task is going to be retried.


................................


2015-03-14T12:01:02.703Z: (1174d086003eaff0): Executing failure step failure1
2015-03-14T12:01:02.707Z: (1174d086003ea342): Workflow failed. Causes: (ac5ca5b613993837): Map task completion for Step "ReadFromDatastore+GetContent+CountWords/ExtractWords+CountWords/Count.PerElement/Init+CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial+CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput+CountWords/Count.PerElement/Sum...." failed. Causes: (9013e6edb4cda414): Task has been attempted unsuccessfully 4 times, the maximum allowed.
2015-03-14T12:01:02.742Z: (157f9c08481a25c5): Stopping the input generators.
2015-03-14T12:01:02.756Z: (697e8bf226b989af): Cleaning up.
2015-03-14T12:01:02.765Z: (697e8bf226b98f98): Tearing down pending resources...
2015-03-14T12:01:02.771Z: (697e8bf226b98581): Starting worker pool teardown.
2015-03-14T12:01:02.776Z: (697e8bf226b9880d): Stopping worker pool...

This brings other questions:

1 - In this case, am I supposed to define a filter?

2 - What is the criteria that Dataflow uses to split the jobs?

3 - Is there an easier way to dump a big table from Datastore?

Thanks.

Upvotes: 0

Views: 1189

Answers (2)

Dan Halperin
Dan Halperin

Reputation: 2247

The issue with QuerySplitter was fixed with Google Cloud Dataflow SDK for Java, version 1.1.0 on Sept. 15th.

And as of Google Cloud Dataflow SDK for Java, version 1.2.0, released this week, namespace support has been added. Please see the code and documentation in DatastoreIO.java for more information.

Upvotes: 0

jkff
jkff

Reputation: 17913

I believe I have isolated the problems you're having:

  • There is a bug in the way DatastoreIO queries are split (we use the QuerySplitter; under some conditions - in particular, when the query returns very few results - it can produce invalid queries for the parallel shards, containing an empty composite filter). I have notified the Datastore team about this bug and they are working on it.
  • According to your code snippet, your query is simply reading all entities of a certain kind from your dataset. The only way how it could trigger the problem above is if this query returned zero results. This is possible in case you're using namespaces - unfortunately QuerySplitter does not support namespaces right now either. The team is considering removing this limitation, but there is no public timeframe.
  • The Dataflow SDK error messages should be more clear about what's going on, and the SDK should detect these errors earlier. I'll fix this.

For now, if you are using namespaces, then it seems that you're out of luck in terms of parallelizing the process of reading from Datastore :( If the bulk of your processing happens after that (i.e. if you do a non-trivial amount of work per Datastore entity), then your current best bet would be to write a (essentially non-parallel) pipeline to dump the entities to GCS, and a second pipeline to read the entities from there and process them in parallel.

The first pipeline would look something like this:

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(DatastoreIO.readFrom(dataset, query).named("ReadFromDatastore"))
        .apply(ParDo.of(new ConvertEntitiesFn())).setCoder(AvroCoder.of(MyEntity.class))
        .apply(AvroIO.Write.named("WriteEntitiesToGCS")
                           .to("gs://your-bucket/path/to/temp.avro")
                           .withSchema(MyEntity.class)
                           .withNumShards(1))

where MyEntity is your class to represent your kind of entities, and ConvertEntitiesFn is a DoFn<DatastoreV1.Entity, MyEntity> doing the conversion.

Make sure to run this pipeline using DirectPipelineRunner (similarly to how writeDataToDatastore does it in DatastoreWordCount.java example). This will bypass the query splitting stage.

and the second something like this:

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(AvroIO.Read.named("ReadEntitiesFromGCS")
                          .from("gs://your-bucket/path/to/temp.avro")
                          .withSchema(MyEntity.class))
        .apply(the rest of your pipeline)

I realize that this is not a great work-around. please stay tuned for updates to the Datastore library.

Upvotes: 3

Related Questions