Google Cloud Data Flow中的例外情况是Cloud Bigtable的管道

时间:2022-05-08 14:38:41

Executing DataFlow pipelines, every once in a while we see those Exceptions. Is there anything we can do about them? We have a quite simple flow that reads from a file in GCS and creates a record per line in the input file - something about 1 million lines in the input file.

执行DataFlow管道,我们偶尔会看到这些异常。我们能对他们做些什么吗?我们有一个非常简单的流程,它从GCS中的文件读取并在输入文件中每行创建一条记录 - 输入文件中大约有100万行。

Also what happens to data inside the pipeline? Is it reprocessed? Or is it lost in transit to BigTable?

管道内的数据会发生什么?它被重新加工了吗?或者它在转移到BigTable时丢失了?

(609803d25ddab111): io.grpc.StatusRuntimeException: UNKNOWN
at io.grpc.Status.asRuntimeException(Status.java:428)
at io.grpc.stub.Calls$StreamObserverToCallListenerAdapter.onClose(Calls.java:284) 
at io.grpc.ClientInterceptors$CheckedForwardingCall.start(ClientInterceptors.java:202) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.retryCall(RetryingCall.java:123) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.runCall(RetryingCall.java:110) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.halfClose(RetryingCall.java:100) 
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:178) 
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:166) 
at io.grpc.stub.Calls.asyncUnaryCall(Calls.java:143) 
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.listenableAsyncCall(BigtableDataGrpcClient.java:244)
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.mutateRowAsync(BigtableDataGrpcClient.java:256) 
at com.google.cloud.bigtable.hbase.BatchExecutor.issuePutRequest(BatchExecutor.java:262) 
at com.google.cloud.bigtable.hbase.BatchExecutor.issueRequest(BatchExecutor.java:300) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.issueRequest(BigtableBufferedMutator.java:365) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doMutation(BigtableBufferedMutator.java:360) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:335) 
at com.company.HBaseBigtableWriter.processElement(HBaseBigtableWriter.java:70) 
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189) 
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193) 
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52) 
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171) 
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:117) 
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:147) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) 
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345) 
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:589) 
at sun.net.NetworkClient.doConnect(NetworkClient.java:175) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) 
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) 
at sun.net.www.http.HttpClient.New(HttpClient.java:308) 
at sun.net.www.http.HttpClient.New(HttpClient.java:326) 
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1168) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1104) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:998) 
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:932) 
at com.google.bigtable.repackaged.com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93) 
at com.google.bigtable.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965) 
at com.google.auth.oauth2.ComputeEngineCredentials.refreshAccessToken(ComputeEngineCredentials.java:61) 
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.doRefresh(RefreshingOAuth2CredentialsInterceptor.java:232) 
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.syncRefresh(RefreshingOAuth2CredentialsInterceptor.java:166) 
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:302) 
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:299) ... 4 more

Is there anything we can do to harden our code?

我们可以做些什么来强化我们的代码?

And the dataflow itself is quite simple

数据流本身非常简单

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setMaxNumWorkers(20);

Pipeline p = Pipeline.create(options);

CloudBigtableIO.initializeForWrite(p)
            .apply(TextIO.Read.from(options.getInputFile()))
            .apply(ParDo.of(new HBaseBigtableWriter(options)));
p.run();

The ParDo looks like:

ParDo看起来像:

public class HBaseBigtableWriter extends DoFn<String, Void> {
private Connection conn;
private BufferedMutator mutator;
private final CloudBigtableTableConfiguration btConfig;

public HBaseBigtableWriter(CloudBigtableOptions options) {
    this.btConfig = CloudBigtableTableConfiguration.fromCBTOptions(options);

@Override
public void startBundle(DoFn<String, Void>.Context c) throws Exception {
    super.startBundle(c);
    conn = new BigtableConnection(btConfig.toHBaseConfig());
    mutator = conn.getBufferedMutator(TableName.valueOf(btConfig.getTableId()));
}

@Override
public void processElement(DoFn<String, Void>.ProcessContext c)  {
    Put put = Put(....);
    //some of based on the input line.. no sideInputs or anything
    p.addImmutable(...)
    mutator.mutate(put); //mentioned line in stacktrace
} 

@Override
public void finishBundle(DoFn<String, Void>.Context c) throws Exception  {
    try {
        mutator.close();
    } catch (RetriesExhaustedWithDetailsException e) {
        retriesExceptionAggregator.addValue(1);
        List<Throwable> causes = e.getCauses();
        if (causes.size() == 1) {
            throw (Exception) causes.get(0);
        } else {
            throw e;

        }
    }
    finally {
        conn.close();
        super.finishBundle(c);
    }
}
}

Also this one is popping up every now and then.

此外,这个偶尔会出现。

java.util.concurrent.RejectedExecutionException: Task io.grpc.SerializingExecutor$TaskRunner@5a497f63 rejected from java.util.concurrent.ThreadPoolExecutor@49e90a5c[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 155291]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at io.grpc.SerializingExecutor.execute(SerializingExecutor.java:112)
at io.grpc.ChannelImpl$CallImpl$ClientStreamListenerImpl.closed(ChannelImpl.java:398)
at io.grpc.transport.AbstractClientStream.closeListener(AbstractClientStream.java:256)
at io.grpc.transport.AbstractClientStream.transportReportStatus(AbstractClientStream.java:230)
at io.grpc.transport.AbstractClientStream.remoteEndClosed(AbstractClientStream.java:180)
at io.grpc.transport.AbstractStream$1.endOfStream(AbstractStream.java:121)
at io.grpc.transport.MessageDeframer.deliver(MessageDeframer.java:253)
at io.grpc.transport.MessageDeframer.deframe(MessageDeframer.java:168)
at io.grpc.transport.AbstractStream.deframe(AbstractStream.java:285)
at io.grpc.transport.AbstractClientStream.inboundTrailersReceived(AbstractClientStream.java:175)
at io.grpc.transport.Http2ClientStream.transportTrailersReceived(Http2ClientStream.java:162)
at io.grpc.transport.netty.NettyClientStream.transportHeadersReceived(NettyClientStream.java:110)
at io.grpc.transport.netty.NettyClientHandler.onHeadersRead(NettyClientHandler.java:179)
at io.grpc.transport.netty.NettyClientHandler.access$800(NettyClientHandler.java:69)
at io.grpc.transport.netty.NettyClientHandler$LazyFrameListener.onHeadersRead(NettyClientHandler.java:424)
at com.google.bigtable.repackaged.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:316)

Also with the Google SDK classes it looks like the same is happening - especially under load - i.e. Dataflow job 2015-09-10_10_26_26-7782438171725519247

此外,使用Google SDK类看起来也是如此 - 特别是在负载下 - 即数据流工作2015-09-10_10_26_26-7782438171725519247

(dedc6cc776609500): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 2 actions: StatusRuntimeException: 2 times,
at    com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:408) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doFlush(BigtableBufferedMutator.java:285) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.close(BigtableBufferedMutator.java:258) 
at org.apache.hadoop.hbase.client.AbstractBigtableConnection$2.close(AbstractBigtableConnection.java:181) 
at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableWriteFn.finishBundle(CloudBigtableIO.java:613)

Any advice on these exceptions? Thanks!

关于这些例外的任何建议?谢谢!

1 个解决方案

#1


1  

Closing a Connection and then doing a mutation could result in the stack traces you see (which I'm guessing happens when you stop a worker while buffered mutations are still in progress).

关闭一个连接,然后进行突变可能会导致你看到的堆栈跟踪(当你在缓冲突变仍在进行时停止工作时,我猜测会发生这种情况)。

Can you please open a bug on our github issue tracker? I think that may be the most effective way to diagnose this issue. https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues

你能否在我们的github问题跟踪器上打开一个错误?我认为这可能是诊断此问题的最有效方法。 https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues

If I read the stack trace correctly, it looks like you're not taking advantage of the CloudBigtableIO.writeToTable() method and that you're using a custom ParDo to write your data. If so, then the answers to your questions really depend on what you're doing in your custom ParDo as well as the dynamics of "stopping the worker."

如果我正确读取堆栈跟踪,看起来您没有利用CloudBigtableIO.writeToTable()方法,并且您正在使用自定义ParDo来编写数据。如果是这样,那么问题的答案实际上取决于你在自定义ParDo中所做的事情以及“阻止工人”的动态。

#1


1  

Closing a Connection and then doing a mutation could result in the stack traces you see (which I'm guessing happens when you stop a worker while buffered mutations are still in progress).

关闭一个连接,然后进行突变可能会导致你看到的堆栈跟踪(当你在缓冲突变仍在进行时停止工作时,我猜测会发生这种情况)。

Can you please open a bug on our github issue tracker? I think that may be the most effective way to diagnose this issue. https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues

你能否在我们的github问题跟踪器上打开一个错误?我认为这可能是诊断此问题的最有效方法。 https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues

If I read the stack trace correctly, it looks like you're not taking advantage of the CloudBigtableIO.writeToTable() method and that you're using a custom ParDo to write your data. If so, then the answers to your questions really depend on what you're doing in your custom ParDo as well as the dynamics of "stopping the worker."

如果我正确读取堆栈跟踪,看起来您没有利用CloudBigtableIO.writeToTable()方法,并且您正在使用自定义ParDo来编写数据。如果是这样,那么问题的答案实际上取决于你在自定义ParDo中所做的事情以及“阻止工人”的动态。