Reputation: 525
I tried to drain a pipeline programmatically. The first part of the code runs the pipeline, which is invoked using a separate thread. Then, the program sleeps for sometime, and then tries to drain the pipeline. I tried to run in Dataflow, and it didn't work. The pipeline start, but then the remaining part of draining code looks like never executed. Please let me know if it is possible.
I tried to see of logging to see how much of program is executed, but looks like Dataflow will log only the worker logs, so couldn't see much till where it executed. I believe the code after the pipeline run is not executed.
DataflowRunner runner = DataflowRunner.fromOptions(options);
DataflowPipelineJob pp = null;
// to run the pipeline which calls pipeline.run
new Thread(() -> runMethod(pp, runner, options)).start();
//Draining below
try {
Thread.sleep(360000);
GoogleCredential credential;
credential = GoogleCredential.getApplicationDefault();
if (credential.createScopedRequired()) {
credential = credential.createScoped(Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"));
}
HttpTransport httpTransport;
httpTransport = GoogleNetHttpTransport.newTrustedTransport();
JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
Dataflow client = new Dataflow.Builder(httpTransport, jsonFactory, credential)
.setApplicationName("Google Cloud Platform Sample 1")
.build();
Job content = new Job();
content.setProjectId("sample-id");
content.setId(pp.getJobId());
content.setRequestedState("JOB_STATE_DRAINING");
client.projects().jobs()
.update("sample-id", pp.getJobId(), content)
.execute();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (GeneralSecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Upvotes: 1
Views: 398
Reputation: 1004
The pipeline will drain as long as there is in flight data. It will stop ingesting data and when all the data is processed the job stops.
1- where are you getting the data from 2- all the data could be process during the sleep time
Upvotes: 0