Reputation: 139
I want to accomplish having the following in my pipeline:
Now, when trying to extend DataflowPipelineOptions
:
public interface CustomPipelineOptions extends DataflowPipelineOptions {
@Description("Sample parameter description")
ValueProvider<String> getSampleParameter();
void setSampleParameter(ValueProvider<String> sampleParameter);
// more custom parameters below...
}
And passing in my CustomPipelineOptions
type options to run()
in my main()
function:
public static void main(String[] args) {
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
// set Dataflow specifc options
options.setProject("my-project");
options.setRegion("my-region");
options.setStagingLocation("gs://my-bucket/location");
options.setTempLocation("gs://my-bucket/location");
options.setSubnetwork("regions/my-region/subnetworks/my-subnetwork");
options.setJobName("my-job-name");
options.setUsePublicIps(false);
options.setRunner(DataflowRunner.class);
run(options);
}
(Note that in the above I configured various DataflowPipelineOptions
options as outlined in the javadoc)
Where I create my pipeline with options of type CustomPipelineOptions
:
static void run(CustomPipelineOptions options) {
/*
Define pipeline
*/
Pipeline p = Pipeline.create(options);
// function continues below...
}
Additionally, I have included the following relevant dependencies in my pom.xml
file:
(Note that ${beam.version}
is 2.31.0 and ${slf4j.version}
is 1.7.25)
<dependencies>
<!-- core beam SDK -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- gcp package -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- dataflowRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<!-- directRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- slf4j; logging for java -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
and when I execute my pipeline with the command:
mvn compile exec:java -Dexec.mainClass=path.to.class.myClass
or with:
mvn compile exec:java -Dexec.mainClass=path.to.class.myClass -Pdataflow-runner
I get the following errors:
Compilation failure:
[ERROR] /C:/Users/path/to/class/myClass.java:[18,40] package org.apache.beam.runners.dataflow does not exist
[ERROR] /C:/Users/path/to/class/myClass.java:[19,48] package org.apache.beam.runners.dataflow.options does not exist
[ERROR] /C:/Users/path/to/class/myClass.java:[56,52] cannot find symbol
[ERROR] symbol: class DataflowPipelineOptions
[ERROR] location: class path.to.class.myClass
[ERROR] /C:/Users/path/to/class/myClass.java:[82,38] incompatible types: path.to.class.myClass.CustomPipelineOptions cannot be converted to org.apache.beam.sdk.options.PipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[145,67] method as in class org.apache.beam.sdk.options.PipelineOptionsFactory.Builder cannot be applied to given types;
and:
[ERROR] required: java.lang.Class<T>
[ERROR] found: java.lang.Class<path.to.class.myClass.CustomPipelineOptions>
[ERROR] reason: inference variable T has incompatible bounds
[ERROR] equality constraints: path.to.class.myClass.CustomPipelineOptions
[ERROR] upper bounds: org.apache.beam.sdk.options.PipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[150,16] cannot find symbol
[ERROR] symbol: method setProject(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[151,16] cannot find symbol
[ERROR] symbol: method setRegion(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[152,16] cannot find symbol
[ERROR] symbol: method setStagingLocation(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[153,16] cannot find symbol
[ERROR] symbol: method setTempLocation(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[154,16] cannot find symbol
[ERROR] symbol: method setSubnetwork(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[155,16] cannot find symbol
[ERROR] symbol: method setJobName(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[156,16] cannot find symbol
[ERROR] symbol: method setUsePublicIps(boolean)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[157,27] cannot find symbol
[ERROR] symbol: class DataflowRunner
[ERROR] location: class path.to.class.myClass
Any ideas as to why I get these errors as well as how to accomplish my goal for including both my own pipeline options defined in CustomPipelineOptions
and dataflow specific DataflowPipelineOptions
would be greatly appreciated. Thanks!
Upvotes: 0
Views: 793
Reputation: 139
The first problem is in the pom.xml
file, we need to change
<!-- dataflowRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope> <!-- delete this! -->
</dependency>
to
<!-- dataflowRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
</dependency>
As mentioned in the dependency scope in the maven documentation, setting the scope to runtime
will indicate that the dependency is "not required for compilation, but is for execution." We need this dependency during compilation, so leaving out the dependency scope results in the default scope compile
.
The second problem is when defining the options
in main():
public static void main(String[] args) {
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
The solution is to get rid of withValidation()
:
public static void main(String[] args) {
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(CustomPipelineOptions.class);
PipelineOptionsValidator's validate()
method "Validates that the passed PipelineOptions
conforms to all the validation criteria from the passed interface." Since our options are assigned after getting validated, the pipeline fails.
Upvotes: 1