AAlferez
AAlferez

Reputation: 1502

DataflowRunner not working

So I'm implementing this Job in Apache Beam to run it ultimately in Dataflow. So I tested in with Direct Runner but when I change it to Dataflow Runner, it crashes:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:293)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
    at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod (InstanceBuilder.java:233)
    at org.apache.beam.sdk.util.InstanceBuilder.build (InstanceBuilder.java:162)
    at org.apache.beam.sdk.PipelineRunner.fromOptions (PipelineRunner.java:52)
    at org.apache.beam.sdk.Pipeline.create (Pipeline.java:142)
    at com.lf.myApacheBeam.MemoryTestProject.MemoryTest.main (MemoryTest.java:171)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:293)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod (InstanceBuilder.java:222)
    at org.apache.beam.sdk.util.InstanceBuilder.build (InstanceBuilder.java:162)
    at org.apache.beam.sdk.PipelineRunner.fromOptions (PipelineRunner.java:52)
    at org.apache.beam.sdk.Pipeline.create (Pipeline.java:142)
    at com.lf.myApacheBeam.MemoryTestProject.MemoryTest.main (MemoryTest.java:171)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:293)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.beam.runners.dataflow.options.DataflowPipelineOptions.setUserAgent(Ljava/lang/String;)V
    at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions (DataflowRunner.java:304)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod (InstanceBuilder.java:222)
    at org.apache.beam.sdk.util.InstanceBuilder.build (InstanceBuilder.java:162)
    at org.apache.beam.sdk.PipelineRunner.fromOptions (PipelineRunner.java:52)
    at org.apache.beam.sdk.Pipeline.create (Pipeline.java:142)
    at com.lf.myApacheBeam.MemoryTestProject.MemoryTest.main (MemoryTest.java:171)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:293)
    at java.lang.Thread.run (Thread.java:748)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.287 s
[INFO] Finished at: 2018-01-30T14:32:51-06:00
[INFO] Final Memory: 38M/376M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:java (default-cli) on project word-count-beam: An exception occured while executing the Java class. null: InvocationTargetException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions): org.apache.beam.runners.dataflow.options.DataflowPipelineOptions.setUserAgent(Ljava/lang/String;)V -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

I run my code with:

 mvn compile exec:java -Dexec.mainClass=com.lf.myApacheBeam.MemoryTestProject.MemoryTest -Dexec.args="--runner=DataflowRunner"

If I change Dataflow with Direct then my code works and runs.

The line where it crashes is:

Pipeline pipeline = Pipeline.create(options);

And my 'options' are declared as:

MemoryTestExtractOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(MemoryTestExtractOptions.class);
    options.setStreaming(true);
    //options.setRunner(DataflowRunner.class);
    options.setTempLocation("gs://blahblahblah/temp/");
    options.setBigQuerySchema(TestResultToRowConverter.getSchema());

Finally my interface 'MemoryTestExtractOptions' is:

  private interface MemoryTestExtractOptions
      extends Options, BigQueryTableOptionsForMemoryTest, StreamingOptions, PubsubTopicAndSubscriptionOptions, DataflowPipelineOptions//, DataflowPipelineOptions
       {


    @Description("BigQuery table to write to, specified as "
        + "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
    String getOutput();
    void setOutput(String value);
}

The error complains about 'DataflowPipelineOptions.setUserAgent' but I do not call it and I checked my options and they are all well formed. Am I missing something specific for Dataflow? Thank you.

Upvotes: 0

Views: 4503

Answers (2)

AAlferez
AAlferez

Reputation: 1502

Found the problem. My pom.xml was specifying the version of dataflow-runner, instead of matching ${beam.version}

I changed, and it worked like a charm.

Thanks for the hints that pointed me into the right direction

Upvotes: 3

Scott Wegner
Scott Wegner

Reputation: 7493

Please make sure that your pom.xml lists org.apache.beam:beam-runners-google-cloud-dataflow-java artifact as a dependency. If you generated your project using the beam-sdks-java-maven-archetypes-examples archetype, you can add -Pdataflow-runner to your maven commandline.

There is an example of this in the Beam Quickstart instructions under Dataflow: https://beam.apache.org/get-started/quickstart-java/

Upvotes: 3

Related Questions