RichardB
RichardB

Reputation: 46

Using Spanner within Apache Beam Dataflow

I am trying to add a Spanner connection within an Apache Beam ParDo(DoFn). I need to lookup some rows as part of the ParDo. The dataflow creates a number of workers (usually 4 max) and I use the startBundle and finishBundle methods to open and close the spanner connections for the workers lifetime. Then within the processElement method I perform the lookup for each item passing the DatabaseClient and using a singleUseReadOnlyTransaction.

I should add this is running as a dataflow under GCP

Some code to illustrate this.

private static CustomDoFn<String, TransactionImport> processRow = new CustomDoFn<String, TransactionImport>(){
    private static final long serialVersionUID = 1L;

    private Spanner spanner = null;
    private DatabaseClient dbClient = null;

    @StartBundle
    public void startBundle(StartBundleContext c){
      TransactionFileOptions options = c.getPipelineOptions().as(TransactionFileOptions.class);

      com.google.cloud.spanner.SpannerOptions spannerOptions = com.google.cloud.spanner.SpannerOptions.newBuilder().build();
      spanner = spannerOptions.getService();
      String spannerProjectID = options.getSpannerProjectId();
      String spannerInstanceID = options.getSpannerInstanceId();
      String spannerDatabaseID = options.getSpannerDatabaseId();

      DatabaseId db = DatabaseId.of(spannerProjectID, spannerInstanceID, spannerDatabaseID);
      dbClient = spanner.getDatabaseClient(db);
    }

    @FinishBundle
    public void finishBundle(FinishBundleContext c){
        spanner.close();  
    }

    @ProcessElement
    public void processElement(DoFn<String, TransactionImport>.ProcessContext c) throws Exception {
    TransactionImport import = new TransactionImport();

    Statement statement = Statement.newBuilder("SELECT * FROM Table1 WHERE Name= @Name")
            .bind("Name").to( text)
            .build();

    ResultSet resultSet = dbClient.singleUseReadOnlyTransaction().executeQuery(statement);

    // set some value  on import dependant on retrieved value

    c.output(import);

}

This always results in the dataflow not completing and when I check the log I see:

Processing stuck in step Process Rows for at least 05m00s without outputting or completing in state process
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:458)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
at java.util.concurrent.SynchronousQueue.take(SynchronousQueue.java:924)
at com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly(Uninterruptibles.java:233)
at com.google.cloud.spanner.SessionPool$Waiter.take(SessionPool.java:411)
at com.google.cloud.spanner.SessionPool$Waiter.access$3300(SessionPool.java:399)
at com.google.cloud.spanner.SessionPool.getReadSession(SessionPool.java:754)
at com.google.cloud.spanner.DatabaseClientImpl.singleUseReadOnlyTransaction(DatabaseClientImpl.java:52)
at com.mycompany.pt.SpannerDataAccess.getBinDetails(SpannerDataAccess.java:197)
at com.mycompany.pt.transactionFiles.TransactionFileDataflow$1.processLine(TransactionFileDataflow.java:411)
at com.mycompany.pt.transactionFiles.TransactionFileDataflow$1.processElement(TransactionFileDataflow.java:336)
at com.mycompany.pt.transactionFiles.TransactionFileDataflow$1$DoFnInvoker.invokeProcessElement(Unknown Source)

`

Does anyone have any experience using Spanner like this within a ParDo?

Upvotes: 1

Views: 1814

Answers (1)

Igor Bernstein
Igor Bernstein

Reputation: 581

I'm not a spanner expert, but maybe I can help:

  1. You should use @Setup/@Teardown to connect & disconnect from spanner. @{Start,Finish}Bundle gets called multiple times over the lifetime of a worker. See here for more details: https://beam.apache.org/documentation/execution-model/#bundling-and-persistence

  2. Does your processElement method ever emit an element using c.output(...)? If not, beam will think your pipeline is stuck

Upvotes: 1

Related Questions