Reputation: 11
We ran a Google Dataflow batch job that writes data records to a pubsub pipeline, and have a separate streaming job that pulls data from the pubsub pipeline and writes updates to our elasticsearch index using the “PubSub to Elasticsearch” streaming template. However, we had to terminate the streaming job that writes to Elastic because we encountered multiple “Error writing to ES after 1 attempt(s). No more attempts allowed” errors.
The job that reads from PubSub and writes to Elasticsearch is writing to an index that is not a streaming index. It actually needs to update the same document in the index, potentially multiple times.
Does this job need to write to a streaming index? If not, how can we configure the dataflow job to actually slow down and not overwhelm Elastic?
I have seen that increasing the thread_pool.index.bulk.queue_size can be used but it isn't recommended.
Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.io.IOException: Error writing to ES after 1 attempt(s). No more attempts allowed
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBundleFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1751)
org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:111)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:538)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Error writing to ES after 1 attempt(s). No more attempts allowed
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.handleRetry(ElasticsearchIO.java:2569)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushBatch(ElasticsearchIO.java:2519)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2435)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.finishBundle(ElasticsearchIO.java:2396)
Upvotes: 0
Views: 188
Reputation: 6023
From the official docs for the template:
The Dataflow template uses Elasticsearch's data streams feature to store time series data across multiple indices while giving you a single named resource for requests
There are parameters you can use to control this and it sounds like you will want to change the defaults:
bulkInsertMethod
(Optional) Whether to useINDEX
(index, allows upserts) orCREATE
(create, errors on duplicate _id) with Elasticsearch bulk requests. Default:CREATE
.
usePartialUpdate
(Optional) Whether to use partial updates (update rather than create or index, allowing partial docs) with Elasticsearch requests. Default:false
.
maxRetryAttempts
(Optional) Max retry attempts, must be >0
. Default: no retries.
Upvotes: 0