Reputation: 41
Use beam 2.6 with Flink 1.5.3 to run test. beam run local flink runner without problem. But unable to run on flink cluster. try both mvn and flink to submit the job. when use mvn to run, I used:
mvn clean package -Pflink-runner exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=FlinkRunner --flinkMaster=c2:6123 --filesToStage=target/word-count-beam-bundled-0.1.jar"
this cause to problem block on Requesting blob server port, and unable to continue
Sep 21, 2018 11:47:38 AM org.apache.zookeeper.ClientCnxn$SendThread logStartConnect INFO: Opening socket connection to server 192.168.0.12/192.168.0.12:2181. Will not attempt to authenticate using SASL (unknown error) Sep 21, 2018 11:47:38 AM org.apache.zookeeper.ClientCnxn$SendThread primeConnection INFO: Socket connection established to 192.168.0.12/192.168.0.12:2181, initiating session Sep 21, 2018 11:47:38 AM org.apache.zookeeper.ClientCnxn$SendThread onConnected INFO: Session establishment complete on server 192.168.0.12/192.168.0.12:2181, sessionid = 0x165adfcdcd9104b, negotiated timeout = 90000 2018-09-21 11:47:38 INFO ConnectionManager$HConnectionImplementation:2155 - Closing master protocol: MasterService 2018-09-21 11:47:38 INFO ConnectionManager$HConnectionImplementation:1712 - Closing zookeeper sessionid=0x165adfcdcd9104b Sep 21, 2018 11:47:38 AM org.apache.zookeeper.ClientCnxn$EventThread run INFO: EventThread shut down Sep 21, 2018 11:47:38 AM org.apache.zookeeper.ZooKeeper close INFO: Session: 0x165adfcdcd9104b closed Sep 21, 2018 11:47:39 AM org.apache.flink.client.program.rest.RestClusterClient submitJob INFO: Submitting job aa366f2bd4bff3ddab47c1a890c84256 (detached: false). Sep 21, 2018 11:47:39 AM org.apache.flink.client.program.rest.RestClusterClient submitJob INFO: Requesting blob server port.
when use flink to submit to job directly by using:
$FLINK_HOME/bin/flink run -c org.apache.beam.examples.WordCount target/test-beam-bundled-0.1.jar --runner=FlinkRunner --flinkMaster=c2:6123 --filesToStage=target/word-count-beam-bundled-0.1.jar
it throw following exception:
The program finished with the following exception:
The RemoteEnvironment cannot be instantiated when running in a pre-defined context (such as Command Line Client, Scala Shell, or TestEnvironment) org.apache.flink.api.java.RemoteEnvironment.(RemoteEnvironment.java:126) org.apache.flink.api.java.RemoteEnvironment.(RemoteEnvironment.java:86) org.apache.flink.api.java.ExecutionEnvironment.createRemoteEnvironment(ExecutionEnvironment.java:1168) org.apache.beam.runners.flink.FlinkExecutionEnvironments.createBatchExecutionEnvironment(FlinkExecutionEnvironments.java:58) org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:93) org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:110) org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
Any idea?
Upvotes: 1
Views: 781
Reputation: 41
I find out it is beam version problem. Beam java sdk only can work with Flink 1.5.1. There is a change in Flink 1.5.2+ which the blob upload method is changed. Beam unable to load the jar throught the Flink blob rest api.
Upvotes: 1