ganesh_patil
ganesh_patil

Reputation: 366

Initializing external service connections in Beam

I am writing a Dataflow streaming pipeline. In one of the transformations, DoFn I want to access an external service - in this case, it is Datastore.

Is there any best practice for this sort of initialization step? I don't want to create the datastore connection object for every processElement method call.

Upvotes: 3

Views: 1319

Answers (1)

Pablo
Pablo

Reputation: 11021

In the Dataflow SDK, the simplest thing you can do is add a check to initialize you external service in your first element:

class DatastoreCallingDoFn extends DoFn {

    private ExtServiceHandle handle = null;

    private ExtServiceHandle initializeConnection() {
      // ...
    }

    public void processElement(ProcessContext c) {
      // ... process each element -- setup will have been called
      if (handle == null) {
        handle = initializeConnection();
      }
      // Process elements
    }
}

If you are using Beam, you can use the @Setup decorator to decorate a function in your DoFn to do the setup of your DoFn, such as initializing the datastore connection.

class DatastoreCallingDoFn extends DoFn {
    @Setup
    public void initializeDatastoreConnection() {
      // ...
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // ... process each element -- setup will have been called
    }
}

This is similar to the answer in this question.

Upvotes: 5

Related Questions