Reputation: 115
In my dataflow job, I need to initialize a Config factory and log certain messages in an audit log before actual processing begins.
I have placed the Config factory initialization code + audit logging in a parent class PlatformInitializer
and extending that in my Main Pipeline class.
public class CustomJob extends PlatformInitializer implements Serializable {
public static void main(String[] args) throws PropertyVetoException {
CustomJob myCustomjob = new CustomJob();
// Initialize config factories
myCustomjob.initialize();
// trigger dataflow job
myCustomjob.parallelRead(args);
}
as a result, I had to also implement Serializable interface in my Pipeline class because beam was throwing error - java.io.NotSerializableException: org.devoteam.CustomJob
Inside PlatformInitializer, I have an initilize() method that contains initialization logic for config factory and also log some initial audit messages.
public class PlatformInitializer {
public void initialize() {
// Configfactory factory = new Configfactory()
// CustomLogger.log("JOB-BEGIN-EVENT" + Current timestamp )
}
}
My question is - is this right way to invoke some code that needs to be called before pipeline begins execution?
Upvotes: 0
Views: 520
Reputation: 565
The JVM initializer interface: https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/harness/JvmInitializer.html , may be appropriate for process-level one-time initializations. For initializing long-lived or expensive objects (e.g. connections) applicable to a specific DoFn / PTransform, DoFn lifecycle methods are usually more appropriate (see @chamikara's answer).
Upvotes: 0
Reputation: 2024
If you need the initialized object at runtime (not at the pipeline construction time), you should move your initialization logic to a Beam DoFn. DoFn
has a number of method annotations that could be used to denote methods that should be executed in different lifecycle phases. Setup
and StartBundle
annotations might be useful for your use-case. See here for more details.
Upvotes: 3