Reputation: 41
We are trying to initiate a dataflow batch process by using an executable jar file
mvn package
java -jar DataFlow-jobs-0.1.jar --tempLocation=gs://events-dataflow/tmp --gcpTempLocation=gs://events-dataflow/tmp --project=google-project-id --runner=DataflowRunner --BQQuery='select t1.user_id google-project-id.deve.user_info t1'
Exception in thread "main" java.lang.IllegalArgumentException: Class interface org.apache.beam.sdk.options.PipelineOptions missing a property named 'gcpTempLocation'.
at org.apache.beam.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1579)
at org.apache.beam.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:104)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:291)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.create(PipelineOptionsFactory.java:270)
at org.customerlabs.beam.WriteFromBQtoES.main(WriteFromBQtoES.java:98)
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<archive>
<manifest>
<mainClass>org.customerlabs.beam.WriteFromBQtoES</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-executable-jar</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
WriteFromBQtoES.java
public class WriteFromBQtoES {
private static DateTimeFormatter fmt =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private static final Logger LOG = LoggerFactory.getLogger(WriteFromBQtoES.class);
private static final ObjectMapper mapper = new ObjectMapper();
public interface Options extends PipelineOptions {
@Description("Bigquery query to fetch data")
@Required
String getBQQuery();
void setBQQuery(String value);
}
public static void main(String[] args) throws IOException{
PipelineOptionsFactory.register(Options.class);
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(Options.class);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> tableRows = p.apply(BigQueryIO.read().fromQuery(options.getBQQuery()).usingStandardSql());
tableRows.apply("WriteToCSV", ParDo.of(new DoFn<TableRow, String>() {
// process WriteToCSV
}))
p.run();
}
}
public static void main(String[] args) throws IOException{
PipelineOptionsFactory.register(Options.class);
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(Options.class);
String query = options.getBQQuery();
Pipeline p = Pipeline.create(options);
.....
..... pipeline operations.....
.....
}
am not sure what we are missing, we having this error. we passing the argument gcpTempLocation in command line. please help to find out this issue. Thanks in advance
Upvotes: 2
Views: 2108
Reputation: 21
I've faced the same issue, just that I used the maven shade plugin to create an uber jar which contains all the dependencies required by the application. Executing the jar file with the parameters required by Apache Beam resulted in the same error, where -gcpTempLocation could not be found. Adding the following block of code into your pom.xml will allow you to make use of maven shade to package your uber jar file, and also solve the missing params issue.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!-- Required to ensure Beam Pipeline options can be passed properly. Without this, pipeline options will not be recognised -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer>
<!-- add Main-Class to manifest file -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>NAME-OF-YOUR-MAIN-CLASS</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
The transformer line will ensure that the Beam pipeline options can be passed through command line params. After adding this to your pom.xml, run mvn package, which will generate an uber jar file in root/target. After which, you can execute your jar file with the following command:
java -jar target/[your-jar-name].jar \
--runner=org.apache.beam.runners.dataflow.DataflowRunner \
--tempLocation=[GCS temp folder path] \
--stagingLocation=[GCS staging folder path]
Upvotes: 2
Reputation: 819
I think instead of PipelineOptions you want:
public interface Options extends DataflowPipelineOptions { ... }
gcpTempLocation is defined in GcpOptions.java and is extended by DataflowPipelineOptions.java.
Upvotes: 2