Reputation: 302
I'm trying to read a Datastore table with 300k records from Dataflow (DatastoreIO) and getting the following error from Datastore API.
400 Bad Request a composite filter must have at least one sub-filter
Code:
Query.Builder q = Query.newBuilder();
q.addKindBuilder().setName(options.getKind());
Query query = q.build();
(...)
Pipeline p = Pipeline.create(options);
p.apply(DatastoreIO.readFrom(options.getDataset(), query).named("ReadFromDatastore"))...
Error (happens 4 times before exit):
[INFO] ------------------------------------------------------------------------
[INFO] Building Google Cloud Dataflow Java Examples - All manual_build
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all >>>
[INFO]
[INFO] <<< exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all <<<
[INFO]
[INFO] --- exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all ---
Mar 14, 2015 8:58:48 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 41 files. Enable logging at DEBUG level to see which files will be staged.
Mar 14, 2015 8:58:50 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
INFO: Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
Mar 14, 2015 8:58:50 AM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading 41 files from PipelineOptions.filesToStage to GCS to prepare for execution in the cloud.
Mar 14, 2015 8:59:12 AM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading PipelineOptions.filesToStage complete: 1 files newly uploaded, 40 files cached
Mar 14, 2015 8:59:13 AM com.google.cloud.dataflow.sdk.io.DatastoreIO$Source queryLatestStatisticsTimestamp
INFO: Query for latest stats timestamp of dataset primebus01 took 854ms
Mar 14, 2015 8:59:13 AM com.google.cloud.dataflow.sdk.io.DatastoreIO$Source getEstimatedSizeBytes
INFO: Query for per-kind statistics took 233ms
Dataflow SDK version: manual_build
Mar 14, 2015 8:59:16 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/primebus01/dataflow/job/2015-03-14_04_59_16-991787963429613914
Submitted job: 2015-03-14_04_59_16-991787963429613914
2015-03-14T11:59:17.796Z: (ab0c86e705e7447a): Expanding GroupByKey operations into optimizable parts.
2015-03-14T11:59:17.800Z: (ab0c86e705e749b0): Annotating graph with Autotuner information.
2015-03-14T11:59:17.806Z: (ab0c86e705e74ee6): Fusing adjacent ParDo, Read, Write, and Flatten operations
2015-03-14T11:59:17.812Z: (ab0c86e705e7441c): Fusing consumer GetContent into ReadFromDatastore
2015-03-14T11:59:17.815Z: (ab0c86e705e74952): Fusing consumer CountWords/Count.PerElement/Init into CountWords/ExtractWords
2015-03-14T11:59:17.818Z: (ab0c86e705e74e88): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues into CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Read
2015-03-14T11:59:17.820Z: (ab0c86e705e743be): Fusing consumer WriteLines into CountWords/FormatCounts
2015-03-14T11:59:17.822Z: (ab0c86e705e748f4): Fusing consumer CountWords/FormatCounts into CountWords/Count.PerElement/Sum.PerKey/GroupedValues/Extract
2015-03-14T11:59:17.824Z: (ab0c86e705e74e2a): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Write into CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput
2015-03-14T11:59:17.826Z: (ab0c86e705e74360): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues/Extract into CountWords/Count.PerElement/Sum.PerKey/GroupedValues
2015-03-14T11:59:17.828Z: (ab0c86e705e74896): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput into CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial
2015-03-14T11:59:17.830Z: (ab0c86e705e74dcc): Fusing consumer CountWords/ExtractWords into GetContent
2015-03-14T11:59:17.832Z: (ab0c86e705e74302): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial into CountWords/Count.PerElement/Init
2015-03-14T11:59:17.843Z: (ab0c86e705e74d10): Adding StepResource setup and teardown to workflow graph.
2015-03-14T11:59:17.850Z: (ab0c86e705e7477c): Not adding lease related steps.
2015-03-14T11:59:17.864Z: (ac5ca5b613993974): Starting the input generators.
2015-03-14T11:59:17.882Z: (9a0f95eb7a7962f5): Adding workflow start and stop steps.
2015-03-14T11:59:17.884Z: (9a0f95eb7a796623): Assigning stage ids.
2015-03-14T11:59:18.290Z: (eb8131b6a76f5248): Starting worker pool setup.
2015-03-14T11:59:18.295Z: (eb8131b6a76f53ac): Starting 3 workers...
2015-03-14T11:59:18.318Z: S01: (1174d086003eadad): Executing operation CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Create
2015-03-14T11:59:18.345Z: (d91fb5c6a16bad02): Value "CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Session" materialized.
2015-03-14T11:59:18.367Z: S02: (1174d086003ea94c): Executing operation ReadFromDatastore+GetContent+CountWords/ExtractWords+CountWords/Count.PerElement/Init+CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial+CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput+CountWords/Count.PerElement/Sum....
2015-03-14T12:00:19.839Z: (9db26953adb2a181): java.io.IOException: java.io.IOException: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:266)
at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.hasNext(BasicSerializableSourceFormat.java:239)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:173)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:120)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:129)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:94)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$1.call(DataflowWorkerHarness.java:118)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$1.call(DataflowWorkerHarness.java:115)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:599)
at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.start(DatastoreIO.java:590)
at com.google.cloud.dataflow.sdk.io.Source$WindowedReaderWrapper.start(Source.java:178)
at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:257)
... 14 more
Caused by: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
at com.google.api.services.datastore.client.RemoteRpc.makeException(RemoteRpc.java:115)
at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:81)
at com.google.api.services.datastore.client.BaseDatastoreFactory$RemoteRpc.call(BaseDatastoreFactory.java:41)
at com.google.api.services.datastore.client.Datastore.runQuery(Datastore.java:109)
at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.getIteratorAndMoveCursor(DatastoreIO.java:630)
at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:597)
... 17 more
Caused by: com.google.api.client.http.HttpResponseException: 400 Bad Request
a composite filter must have at least one sub-filter
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1054)
at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78)
... 21 more
2015-03-14T12:00:24.850Z: (7ffa05cd66cd6940): Failed task is going to be retried.
................................
2015-03-14T12:01:02.703Z: (1174d086003eaff0): Executing failure step failure1
2015-03-14T12:01:02.707Z: (1174d086003ea342): Workflow failed. Causes: (ac5ca5b613993837): Map task completion for Step "ReadFromDatastore+GetContent+CountWords/ExtractWords+CountWords/Count.PerElement/Init+CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial+CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput+CountWords/Count.PerElement/Sum...." failed. Causes: (9013e6edb4cda414): Task has been attempted unsuccessfully 4 times, the maximum allowed.
2015-03-14T12:01:02.742Z: (157f9c08481a25c5): Stopping the input generators.
2015-03-14T12:01:02.756Z: (697e8bf226b989af): Cleaning up.
2015-03-14T12:01:02.765Z: (697e8bf226b98f98): Tearing down pending resources...
2015-03-14T12:01:02.771Z: (697e8bf226b98581): Starting worker pool teardown.
2015-03-14T12:01:02.776Z: (697e8bf226b9880d): Stopping worker pool...
This brings other questions:
1 - In this case, am I supposed to define a filter?
2 - What is the criteria that Dataflow uses to split the jobs?
3 - Is there an easier way to dump a big table from Datastore?
Thanks.
Upvotes: 0
Views: 1189
Reputation: 2247
The issue with QuerySplitter was fixed with Google Cloud Dataflow SDK for Java, version 1.1.0 on Sept. 15th.
And as of Google Cloud Dataflow SDK for Java, version 1.2.0, released this week, namespace support has been added. Please see the code and documentation in DatastoreIO.java for more information.
Upvotes: 0
Reputation: 17913
I believe I have isolated the problems you're having:
For now, if you are using namespaces, then it seems that you're out of luck in terms of parallelizing the process of reading from Datastore :( If the bulk of your processing happens after that (i.e. if you do a non-trivial amount of work per Datastore entity), then your current best bet would be to write a (essentially non-parallel) pipeline to dump the entities to GCS, and a second pipeline to read the entities from there and process them in parallel.
The first pipeline would look something like this:
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(DatastoreIO.readFrom(dataset, query).named("ReadFromDatastore"))
.apply(ParDo.of(new ConvertEntitiesFn())).setCoder(AvroCoder.of(MyEntity.class))
.apply(AvroIO.Write.named("WriteEntitiesToGCS")
.to("gs://your-bucket/path/to/temp.avro")
.withSchema(MyEntity.class)
.withNumShards(1))
where MyEntity is your class to represent your kind of entities, and ConvertEntitiesFn is a DoFn<DatastoreV1.Entity, MyEntity>
doing the conversion.
Make sure to run this pipeline using DirectPipelineRunner (similarly to how writeDataToDatastore
does it in DatastoreWordCount.java example). This will bypass the query splitting stage.
and the second something like this:
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(AvroIO.Read.named("ReadEntitiesFromGCS")
.from("gs://your-bucket/path/to/temp.avro")
.withSchema(MyEntity.class))
.apply(the rest of your pipeline)
I realize that this is not a great work-around. please stay tuned for updates to the Datastore library.
Upvotes: 3