Champer
Champer

Reputation: 95

Parallel pipeline inside one Dataflow Job

I want to run two, parallel pipelines inside one Dataflow Job on GCP. I've already created one pipeline and it works just fine but I want to another one without creating another job.

I've searched for an answer so much but couldn't find any code examples :(

It doesn't work if i run it like this:

pipe1.run();
pipe2.run();

It gives me "There is already an active job name... If you want to submit a second job, try again setting a different name using --jobName"

Upvotes: 2

Views: 2546

Answers (1)

jszule
jszule

Reputation: 432

You can apply other inputs to a pipeline and that will result a separated pipeline in one job. E.g.:

public class ExamplePipeline {

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    options.setRunner(DirectRunner.class);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> linesForPipelineOne = pipeline.apply(Create.of("A1", "B1"));
    PCollection<String> linesToWriteFromPipelineOne = linesForPipelineOne.apply("Pipeline 1 transform",
            ParDo.of(new DoFn<String, String>() {

        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("Pipeline one:" + c.element());
            c.output(c.element() + " extra message.");
        }

    }));
    linesToWriteFromPipelineOne.apply((TextIO.write().to("file.txt")));

    PCollection<String> linesForPipelineTwo = pipeline.apply(Create.of("A2", "B2"));
    linesForPipelineTwo.apply("Pipeline 2 transoform",
            ParDo.of(new DoFn<String, String>() {

        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("Pipeline two:" + c.element());
        }

    }));

    pipeline.run();
}

As you can see, you can apply two (or more) separated PBegin to a pipeline with multiple PDone/Sinks as well. In this example "pipeline 1" dumps and writes the output to a file and "pipeline 2" dumps it only to the screen.

If you run this with DataflowRunner on GCP the GUI will show you 2 not connected "pipelines".

Upvotes: 5

Related Questions