Roshan Fernando
Roshan Fernando

Reputation: 525

Dataflow to drain a pipeline programmatically

I tried to drain a pipeline programmatically. The first part of the code runs the pipeline, which is invoked using a separate thread. Then, the program sleeps for sometime, and then tries to drain the pipeline. I tried to run in Dataflow, and it didn't work. The pipeline start, but then the remaining part of draining code looks like never executed. Please let me know if it is possible.

I tried to see of logging to see how much of program is executed, but looks like Dataflow will log only the worker logs, so couldn't see much till where it executed. I believe the code after the pipeline run is not executed.

DataflowRunner runner = DataflowRunner.fromOptions(options);
           DataflowPipelineJob pp = null; 
        // to run the pipeline which calls pipeline.run  
    new Thread(() -> runMethod(pp, runner, options)).start();   

        //Draining below
        try {
            Thread.sleep(360000);           
            GoogleCredential credential;
            credential = GoogleCredential.getApplicationDefault();
               if (credential.createScopedRequired()) {
                   credential = credential.createScoped(Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"));
               }
               HttpTransport httpTransport;        
               httpTransport = GoogleNetHttpTransport.newTrustedTransport();
               JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
                  Dataflow client = new Dataflow.Builder(httpTransport, jsonFactory, credential)
                          .setApplicationName("Google Cloud Platform Sample 1")
                          .build(); 
               Job content = new Job();
               content.setProjectId("sample-id");
               content.setId(pp.getJobId());
               content.setRequestedState("JOB_STATE_DRAINING");
               client.projects().jobs()
                       .update("sample-id", pp.getJobId(), content)
                       .execute();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }  catch (GeneralSecurityException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
           }

Upvotes: 1

Views: 398

Answers (1)

Nathan Nasser
Nathan Nasser

Reputation: 1004

The pipeline will drain as long as there is in flight data. It will stop ingesting data and when all the data is processed the job stops.

1- where are you getting the data from 2- all the data could be process during the sleep time

Upvotes: 0

Related Questions