vishnuixm
vishnuixm

Reputation: 41

Invoke Dataflow process from jar file, having error PipelineOptions missing a property named 'gcpTempLocation'

Requirement

We are trying to initiate a dataflow batch process by using an executable jar file

Process followed

Output

Exception in thread "main" java.lang.IllegalArgumentException: Class interface org.apache.beam.sdk.options.PipelineOptions missing a property named 'gcpTempLocation'.
at org.apache.beam.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1579)
at org.apache.beam.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:104)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:291)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.create(PipelineOptionsFactory.java:270)
at org.customerlabs.beam.WriteFromBQtoES.main(WriteFromBQtoES.java:98)

Code

pom.xml
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>

    <configuration>
        <appendAssemblyId>false</appendAssemblyId>
        <archive>
            <manifest>
                <mainClass>org.customerlabs.beam.WriteFromBQtoES</mainClass>
            </manifest>
        </archive>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>

    <executions>
        <execution>
            <id>make-executable-jar</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
  </plugin>
WriteFromBQtoES.java
public class WriteFromBQtoES {
    private static DateTimeFormatter fmt =
        DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
    private static final Logger LOG = LoggerFactory.getLogger(WriteFromBQtoES.class);
    private static final ObjectMapper mapper = new ObjectMapper();

    public interface Options extends PipelineOptions {
        @Description("Bigquery query to fetch data")
        @Required
        String getBQQuery();
        void setBQQuery(String value);
    }

    public static void main(String[] args) throws IOException{
        PipelineOptionsFactory.register(Options.class);
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(Options.class);

        Pipeline p = Pipeline.create(options);
        PCollection<TableRow> tableRows = p.apply(BigQueryIO.read().fromQuery(options.getBQQuery()).usingStandardSql());

        tableRows.apply("WriteToCSV", ParDo.of(new DoFn<TableRow, String>() {
        // process WriteToCSV
        }))
        p.run();
    }
}

public static void main(String[] args) throws IOException{
   PipelineOptionsFactory.register(Options.class);
   Options options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(Options.class);
   String query = options.getBQQuery();
   Pipeline p = Pipeline.create(options);
   .....
   ..... pipeline operations.....
   .....
}

am not sure what we are missing, we having this error. we passing the argument gcpTempLocation in command line. please help to find out this issue. Thanks in advance

Upvotes: 2

Views: 2108

Answers (2)

Rick Wong
Rick Wong

Reputation: 21

I've faced the same issue, just that I used the maven shade plugin to create an uber jar which contains all the dependencies required by the application. Executing the jar file with the parameters required by Apache Beam resulted in the same error, where -gcpTempLocation could not be found. Adding the following block of code into your pom.xml will allow you to make use of maven shade to package your uber jar file, and also solve the missing params issue.

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <version>${maven-shade-plugin.version}</version>
  <executions>
    <!-- Run shade goal on package phase -->
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <transformers>
          <!-- Required to ensure Beam Pipeline options can be passed properly. Without this, pipeline options will not be recognised -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer>
          <!-- add Main-Class to manifest file -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
            <mainClass>NAME-OF-YOUR-MAIN-CLASS</mainClass>
          </transformer>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>

The transformer line will ensure that the Beam pipeline options can be passed through command line params. After adding this to your pom.xml, run mvn package, which will generate an uber jar file in root/target. After which, you can execute your jar file with the following command:

java -jar target/[your-jar-name].jar \
--runner=org.apache.beam.runners.dataflow.DataflowRunner \
--tempLocation=[GCS temp folder path] \
--stagingLocation=[GCS staging folder path]

Upvotes: 2

Slava Chernyak
Slava Chernyak

Reputation: 819

I think instead of PipelineOptions you want:

public interface Options extends DataflowPipelineOptions { ... }

gcpTempLocation is defined in GcpOptions.java and is extended by DataflowPipelineOptions.java.

Upvotes: 2

Related Questions