Sebastian J.
Sebastian J.

Reputation: 762

Exceptions in Google Cloud Dataflow pipelines to Cloud Bigtable

Executing DataFlow pipelines, every once in a while we see those Exceptions. Is there anything we can do about them? We have a quite simple flow that reads from a file in GCS and creates a record per line in the input file - something about 1 million lines in the input file.

Also what happens to data inside the pipeline? Is it reprocessed? Or is it lost in transit to BigTable?

(609803d25ddab111): io.grpc.StatusRuntimeException: UNKNOWN
at io.grpc.Status.asRuntimeException(Status.java:428)
at io.grpc.stub.Calls$StreamObserverToCallListenerAdapter.onClose(Calls.java:284) 
at io.grpc.ClientInterceptors$CheckedForwardingCall.start(ClientInterceptors.java:202) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.retryCall(RetryingCall.java:123) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.runCall(RetryingCall.java:110) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.halfClose(RetryingCall.java:100) 
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:178) 
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:166) 
at io.grpc.stub.Calls.asyncUnaryCall(Calls.java:143) 
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.listenableAsyncCall(BigtableDataGrpcClient.java:244)
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.mutateRowAsync(BigtableDataGrpcClient.java:256) 
at com.google.cloud.bigtable.hbase.BatchExecutor.issuePutRequest(BatchExecutor.java:262) 
at com.google.cloud.bigtable.hbase.BatchExecutor.issueRequest(BatchExecutor.java:300) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.issueRequest(BigtableBufferedMutator.java:365) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doMutation(BigtableBufferedMutator.java:360) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:335) 
at com.company.HBaseBigtableWriter.processElement(HBaseBigtableWriter.java:70) 
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189) 
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193) 
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52) 
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171) 
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:117) 
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:147) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) 
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345) 
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:589) 
at sun.net.NetworkClient.doConnect(NetworkClient.java:175) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) 
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) 
at sun.net.www.http.HttpClient.New(HttpClient.java:308) 
at sun.net.www.http.HttpClient.New(HttpClient.java:326) 
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1168) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1104) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:998) 
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:932) 
at com.google.bigtable.repackaged.com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93) 
at com.google.bigtable.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965) 
at com.google.auth.oauth2.ComputeEngineCredentials.refreshAccessToken(ComputeEngineCredentials.java:61) 
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.doRefresh(RefreshingOAuth2CredentialsInterceptor.java:232) 
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.syncRefresh(RefreshingOAuth2CredentialsInterceptor.java:166) 
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:302) 
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:299) ... 4 more

Is there anything we can do to harden our code?

And the dataflow itself is quite simple

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setMaxNumWorkers(20);

Pipeline p = Pipeline.create(options);

CloudBigtableIO.initializeForWrite(p)
            .apply(TextIO.Read.from(options.getInputFile()))
            .apply(ParDo.of(new HBaseBigtableWriter(options)));
p.run();

The ParDo looks like:

public class HBaseBigtableWriter extends DoFn<String, Void> {
private Connection conn;
private BufferedMutator mutator;
private final CloudBigtableTableConfiguration btConfig;

public HBaseBigtableWriter(CloudBigtableOptions options) {
    this.btConfig = CloudBigtableTableConfiguration.fromCBTOptions(options);

@Override
public void startBundle(DoFn<String, Void>.Context c) throws Exception {
    super.startBundle(c);
    conn = new BigtableConnection(btConfig.toHBaseConfig());
    mutator = conn.getBufferedMutator(TableName.valueOf(btConfig.getTableId()));
}

@Override
public void processElement(DoFn<String, Void>.ProcessContext c)  {
    Put put = Put(....);
    //some of based on the input line.. no sideInputs or anything
    p.addImmutable(...)
    mutator.mutate(put); //mentioned line in stacktrace
} 

@Override
public void finishBundle(DoFn<String, Void>.Context c) throws Exception  {
    try {
        mutator.close();
    } catch (RetriesExhaustedWithDetailsException e) {
        retriesExceptionAggregator.addValue(1);
        List<Throwable> causes = e.getCauses();
        if (causes.size() == 1) {
            throw (Exception) causes.get(0);
        } else {
            throw e;

        }
    }
    finally {
        conn.close();
        super.finishBundle(c);
    }
}
}

Also this one is popping up every now and then.

java.util.concurrent.RejectedExecutionException: Task io.grpc.SerializingExecutor$TaskRunner@5a497f63 rejected from java.util.concurrent.ThreadPoolExecutor@49e90a5c[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 155291]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at io.grpc.SerializingExecutor.execute(SerializingExecutor.java:112)
at io.grpc.ChannelImpl$CallImpl$ClientStreamListenerImpl.closed(ChannelImpl.java:398)
at io.grpc.transport.AbstractClientStream.closeListener(AbstractClientStream.java:256)
at io.grpc.transport.AbstractClientStream.transportReportStatus(AbstractClientStream.java:230)
at io.grpc.transport.AbstractClientStream.remoteEndClosed(AbstractClientStream.java:180)
at io.grpc.transport.AbstractStream$1.endOfStream(AbstractStream.java:121)
at io.grpc.transport.MessageDeframer.deliver(MessageDeframer.java:253)
at io.grpc.transport.MessageDeframer.deframe(MessageDeframer.java:168)
at io.grpc.transport.AbstractStream.deframe(AbstractStream.java:285)
at io.grpc.transport.AbstractClientStream.inboundTrailersReceived(AbstractClientStream.java:175)
at io.grpc.transport.Http2ClientStream.transportTrailersReceived(Http2ClientStream.java:162)
at io.grpc.transport.netty.NettyClientStream.transportHeadersReceived(NettyClientStream.java:110)
at io.grpc.transport.netty.NettyClientHandler.onHeadersRead(NettyClientHandler.java:179)
at io.grpc.transport.netty.NettyClientHandler.access$800(NettyClientHandler.java:69)
at io.grpc.transport.netty.NettyClientHandler$LazyFrameListener.onHeadersRead(NettyClientHandler.java:424)
at com.google.bigtable.repackaged.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:316)

Also with the Google SDK classes it looks like the same is happening - especially under load - i.e. Dataflow job 2015-09-10_10_26_26-7782438171725519247

(dedc6cc776609500): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 2 actions: StatusRuntimeException: 2 times,
at    com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:408) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doFlush(BigtableBufferedMutator.java:285) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.close(BigtableBufferedMutator.java:258) 
at org.apache.hadoop.hbase.client.AbstractBigtableConnection$2.close(AbstractBigtableConnection.java:181) 
at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableWriteFn.finishBundle(CloudBigtableIO.java:613)

Any advice on these exceptions? Thanks!

Upvotes: 1

Views: 1101

Answers (1)

Solomon Duskis
Solomon Duskis

Reputation: 2711

Closing a Connection and then doing a mutation could result in the stack traces you see (which I'm guessing happens when you stop a worker while buffered mutations are still in progress).

Can you please open a bug on our github issue tracker? I think that may be the most effective way to diagnose this issue. https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues

If I read the stack trace correctly, it looks like you're not taking advantage of the CloudBigtableIO.writeToTable() method and that you're using a custom ParDo to write your data. If so, then the answers to your questions really depend on what you're doing in your custom ParDo as well as the dynamics of "stopping the worker."

Upvotes: 2

Related Questions